nsivabalan commented on code in PR #8758:
URL: https://github.com/apache/hudi/pull/8758#discussion_r1225438663


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -432,6 +468,7 @@ private boolean initializeFromFilesystem(String 
initializationTime, List<Metadat
       bulkCommit(commitTimeForPartition, partitionType, records, 
fileGroupCount);
       metadataMetaClient.reloadActiveTimeline();
       
dataMetaClient.getTableConfig().setMetadataPartitionState(dataMetaClient, 
partitionType, true);
+      initMetadataReader();

Review Comment:
   I don't think this is required. 
   if there are no partition to init, we initialize reader at L259. and if 
there is any to init, we initialize reader at L434. 



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkMetadataTableRecordIndex.java:
##########
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.data.HoodiePairData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.data.HoodieJavaPairRDD;
+import org.apache.hudi.data.HoodieJavaRDD;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.exception.TableNotFoundException;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import scala.Tuple2;
+
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
+
+/**
+ * Hoodie Index implementation backed by the record index present in the 
Metadata Table.
+ */
+public class SparkMetadataTableRecordIndex extends HoodieIndex<Object, Object> 
{
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(SparkMetadataTableRecordIndex.class);
+  // The index to fallback upon when record index is not initialized yet.
+  // This should be a global index like record index so that the behavior of 
tagging across partitions is not changed.
+  private static final HoodieIndex.IndexType FALLBACK_INDEX_TYPE = 
IndexType.GLOBAL_SIMPLE;
+
+  public SparkMetadataTableRecordIndex(HoodieWriteConfig config) {
+    super(config);
+  }
+
+  @Override
+  public <R> HoodieData<HoodieRecord<R>> 
tagLocation(HoodieData<HoodieRecord<R>> records, HoodieEngineContext context, 
HoodieTable hoodieTable) throws HoodieIndexException {
+    int fileGroupSize;
+    try {
+      
ValidationUtils.checkState(hoodieTable.getMetaClient().getTableConfig().isMetadataPartitionEnabled(MetadataPartitionType.RECORD_INDEX));
+      fileGroupSize = 
hoodieTable.getMetadataTable().getNumShards(MetadataPartitionType.RECORD_INDEX);
+      ValidationUtils.checkState(fileGroupSize > 0, "Record index should have 
at least one file group");
+    } catch (TableNotFoundException | IllegalStateException e) {
+      // This means that record index has not been initialized.
+      LOG.warn(String.format("Record index not initialized so falling back to 
%s for tagging records", FALLBACK_INDEX_TYPE.name()));

Review Comment:
   if we don't fail here, how does user know that RLI has some issue and needs 
manual intervention ? 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java:
##########
@@ -683,12 +683,19 @@ public void initializeBootstrapDirsIfNotExists() throws 
IOException {
     initializeBootstrapDirsIfNotExists(getHadoopConf(), basePath.toString(), 
getFs());
   }
 
+  /**
+   * Reload table config and cache.
+   */
+  public synchronized void reloadTableConfig() {

Review Comment:
   I see this is used only in tests. we should not add any apis to source code 
just for test purposes. 



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -163,16 +160,16 @@ public List<String> 
getPartitionPathWithPathPrefixes(List<String> relativePathPr
   public HoodieData<HoodieRecord<HoodieMetadataPayload>> 
getRecordsByKeyPrefixes(List<String> keyPrefixes,
                                                                                
  String partitionName,
                                                                                
  boolean shouldLoadInMemory) {
-    // Sort the columns so that keys are looked up in order
+    // Sort the prefixes so that keys are looked up in order
     List<String> sortedKeyPrefixes = new ArrayList<>(keyPrefixes);
     Collections.sort(sortedKeyPrefixes);
 
     // NOTE: Since we partition records to a particular file-group by full 
key, we will have
     //       to scan all file-groups for all key-prefixes as each of these 
might contain some
     //       records matching the key-prefix
-    List<FileSlice> partitionFileSlices =
-        HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(
-            metadataMetaClient, metadataFileSystemView, partitionName);
+    List<FileSlice> partitionFileSlices = 
partitionFileSliceMap.computeIfAbsent(partitionName,

Review Comment:
   getRecordsByKeyPrefixes is called from within a spark task. I am assuming 
you are making the changes having this in mind. ie. partitionFileSliceMap will 
be in driver and all executors/tasks will share it. 



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -193,121 +190,126 @@ public HoodieData<HoodieRecord<HoodieMetadataPayload>> 
getRecordsByKeyPrefixes(L
                   return Collections.emptyIterator();
                 }
 
-              boolean fullKeys = false;
+                boolean fullKeys = false;
 
-              Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> 
logRecords =
-                  readLogRecords(logRecordScanner, sortedKeyPrefixes, 
fullKeys, timings);
+                Map<String, HoodieRecord<HoodieMetadataPayload>> logRecords =
+                    readLogRecords(logRecordScanner, sortedKeyPrefixes, 
fullKeys, timings);
 
-              List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> 
mergedRecords =
-                  readFromBaseAndMergeWithLogRecords(baseFileReader, 
sortedKeyPrefixes, fullKeys, logRecords, timings, partitionName);
+                Map<String, HoodieRecord<HoodieMetadataPayload>> mergedRecords 
=
+                    readFromBaseAndMergeWithLogRecords(baseFileReader, 
sortedKeyPrefixes, fullKeys, logRecords, timings, partitionName);
 
-              LOG.debug(String.format("Metadata read for %s keys took 
[baseFileRead, logMerge] %s ms",
-                  sortedKeyPrefixes.size(), timings));
+                LOG.debug(String.format("Metadata read for %s keys took 
[baseFileRead, logMerge] %s ms",
+                    sortedKeyPrefixes.size(), timings));
 
-              return mergedRecords.stream()
-                .map(keyRecordPair -> keyRecordPair.getValue().orElse(null))
-                .filter(Objects::nonNull)
-                .iterator();
-            } catch (IOException ioe) {
-              throw new HoodieIOException("Error merging records from metadata 
table for  " + sortedKeyPrefixes.size() + " key : ", ioe);
-            } finally {
-              closeReader(readers);
-            }
-          });
+                return mergedRecords.values().iterator();
+              } catch (IOException ioe) {
+                throw new HoodieIOException("Error merging records from 
metadata table for  " + sortedKeyPrefixes.size() + " key : ", ioe);
+              } finally {
+                closeReader(readers);
+              }
+            });
   }
 
   @Override
-  public List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> 
getRecordsByKeys(List<String> keys,
-                                                                               
           String partitionName) {
-    // Sort the columns so that keys are looked up in order
-    List<String> sortedKeys = new ArrayList<>(keys);
-    Collections.sort(sortedKeys);
-    Map<Pair<String, FileSlice>, List<String>> partitionFileSliceToKeysMap = 
getPartitionFileSliceToKeysMapping(partitionName, sortedKeys);
-    List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> result = 
new ArrayList<>();
-    AtomicInteger fileSlicesKeysCount = new AtomicInteger();
-    partitionFileSliceToKeysMap.forEach((partitionFileSlicePair, 
fileSliceKeys) -> {
-      Pair<HoodieSeekingFileReader<?>, HoodieMetadataLogRecordReader> readers =
-          getOrCreateReaders(partitionName, partitionFileSlicePair.getRight());
-      try {
-        List<Long> timings = new ArrayList<>();
-        HoodieSeekingFileReader<?> baseFileReader = readers.getKey();
-        HoodieMetadataLogRecordReader logRecordScanner = readers.getRight();
-        if (baseFileReader == null && logRecordScanner == null) {
-          return;
-        }
+  protected Map<String, HoodieRecord<HoodieMetadataPayload>> 
getRecordsByKeys(List<String> keys, String partitionName) {
+    if (keys.isEmpty()) {
+      return Collections.emptyMap();
+    }
 
-        boolean fullKeys = true;
-        Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> logRecords =
-            readLogRecords(logRecordScanner, fileSliceKeys, fullKeys, timings);
-
-        result.addAll(readFromBaseAndMergeWithLogRecords(baseFileReader, 
fileSliceKeys, fullKeys, logRecords,
-            timings, partitionName));
-
-        LOG.debug(String.format("Metadata read for %s keys took [baseFileRead, 
logMerge] %s ms",
-            fileSliceKeys.size(), timings));
-        fileSlicesKeysCount.addAndGet(fileSliceKeys.size());
-      } catch (IOException ioe) {
-        throw new HoodieIOException("Error merging records from metadata table 
for  " + sortedKeys.size() + " key : ", ioe);
-      } finally {
-        if (!reuse) {
-          closeReader(readers);
-        }
+    Map<String, HoodieRecord<HoodieMetadataPayload>> result;
+
+    // Load the file slices for the partition. Each file slice is a shard 
which saves a portion of the keys.
+    List<FileSlice> partitionFileSlices = 
partitionFileSliceMap.computeIfAbsent(partitionName,
+        k -> 
HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient, 
metadataFileSystemView, partitionName));
+    final int numFileSlices = partitionFileSlices.size();
+    ValidationUtils.checkState(numFileSlices > 0, "Number of file slices for 
partition " + partitionName + " should be > 0");
+
+    // Lookup keys from each file slice
+    if (numFileSlices == 1) {
+      // Optimization for a single slice for smaller metadata table partitions
+      result = lookupKeysFromFileSlice(partitionName, keys, 
partitionFileSlices.get(0));
+    } else {
+      // Parallel lookup for large sized partitions with many file slices
+      // Partition the keys by the file slice which contains it
+      ArrayList<ArrayList<String>> partitionedKeys = new 
ArrayList<>(numFileSlices);
+      for (int i = 0; i < numFileSlices; ++i) {
+        partitionedKeys.add(new ArrayList<>());
       }
-    });
+      keys.forEach(key -> {
+        int shardIndex = 
HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(key, numFileSlices);
+        partitionedKeys.get(shardIndex).add(key);
+      });
+
+      result = new HashMap<>(keys.size());
+      engineContext.setJobStatus(this.getClass().getSimpleName(), "Reading 
keys from metadata table partition " + partitionName);
+      engineContext.map(partitionedKeys, keysList -> {

Review Comment:
   lets add javav docs calling out that this is a local engine context and 
under the hood uses java parallel streams.



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkMetadataTableRecordIndex.java:
##########
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.data.HoodiePairData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.data.HoodieJavaPairRDD;
+import org.apache.hudi.data.HoodieJavaRDD;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.exception.TableNotFoundException;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import scala.Tuple2;
+
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
+
+/**
+ * Hoodie Index implementation backed by the record index present in the 
Metadata Table.
+ */
+public class SparkMetadataTableRecordIndex extends HoodieIndex<Object, Object> 
{
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(SparkMetadataTableRecordIndex.class);
+  // The index to fallback upon when record index is not initialized yet.
+  // This should be a global index like record index so that the behavior of 
tagging across partitions is not changed.
+  private static final HoodieIndex.IndexType FALLBACK_INDEX_TYPE = 
IndexType.GLOBAL_SIMPLE;
+
+  public SparkMetadataTableRecordIndex(HoodieWriteConfig config) {
+    super(config);
+  }
+
+  @Override
+  public <R> HoodieData<HoodieRecord<R>> 
tagLocation(HoodieData<HoodieRecord<R>> records, HoodieEngineContext context, 
HoodieTable hoodieTable) throws HoodieIndexException {
+    int fileGroupSize;
+    try {
+      
ValidationUtils.checkState(hoodieTable.getMetaClient().getTableConfig().isMetadataPartitionEnabled(MetadataPartitionType.RECORD_INDEX));
+      fileGroupSize = 
hoodieTable.getMetadataTable().getNumShards(MetadataPartitionType.RECORD_INDEX);
+      ValidationUtils.checkState(fileGroupSize > 0, "Record index should have 
at least one file group");
+    } catch (TableNotFoundException | IllegalStateException e) {
+      // This means that record index has not been initialized.
+      LOG.warn(String.format("Record index not initialized so falling back to 
%s for tagging records", FALLBACK_INDEX_TYPE.name()));
+
+      // Fallback to another index so that tagLocation is still accurate and 
there are no duplicates.
+      HoodieWriteConfig otherConfig = 
HoodieWriteConfig.newBuilder().withProperties(config.getProps())
+          
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(FALLBACK_INDEX_TYPE).build()).build();
+      HoodieIndex fallbackIndex = 
SparkHoodieIndexFactory.createIndex(otherConfig);
+
+      // Fallback index needs to be a global index like record index
+      ValidationUtils.checkArgument(fallbackIndex.isGlobal(), "Fallback index 
needs to be a global index like record index");
+
+      return fallbackIndex.tagLocation(records, context, hoodieTable);
+    }
+
+    // final variable required for lamda functions below
+    final int numFileGroups = fileGroupSize;
+
+    // Partition the record keys to lookup such that each partition looks up 
one record index shard
+    JavaRDD<String> partitionedKeyRDD = HoodieJavaRDD.getJavaRDD(records)
+        .map(HoodieRecord::getRecordKey)
+        .keyBy(k -> HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(k, 
numFileGroups))
+        .partitionBy(new PartitionIdPassthrough(numFileGroups))
+        .map(t -> t._2);
+    ValidationUtils.checkState(partitionedKeyRDD.getNumPartitions() <= 
numFileGroups);
+
+    // Lookup the keys in the record index
+    HoodiePairData<String, HoodieRecordGlobalLocation> keyToLocationPairRDD =
+        HoodieJavaPairRDD.of(partitionedKeyRDD.mapPartitionsToPair(new 
RecordIndexFileGroupLookupFunction(hoodieTable)));
+
+    // Tag the incoming records, as inserts or updates, by joining with 
existing record keys
+    HoodieData<HoodieRecord<R>> taggedRecords = 
tagLocationBackToRecords(keyToLocationPairRDD, records);
+
+    // The number of partitions in the taggedRecords is expected to the 
maximum of the partitions in
+    // keyToLocationPairRDD and records RDD.
+
+    return taggedRecords;
+  }
+
+  @Override
+  public HoodieData<WriteStatus> updateLocation(HoodieData<WriteStatus> 
writeStatuses, HoodieEngineContext context,
+      HoodieTable hoodieTable) {
+    // This is a no-op as metadata record index updates are automatically 
maintained within the metadata table.
+    return writeStatuses;
+  }
+
+  @Override
+  public boolean rollbackCommit(String instantTime) {

Review Comment:
   may be we should pass hoodietable as an argument to rollbackCommit and 
leverage the metadata table in hoodieTable. and by which we can fetch 
mdtTimeline instead of constructing a new one below.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -911,8 +1018,8 @@ public void close() throws Exception {
   protected void bulkCommit(
       String instantTime, MetadataPartitionType partitionType, 
HoodieData<HoodieRecord> records,
       int fileGroupCount) {
-    Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionRecordsMap = 
new HashMap<>();
-    partitionRecordsMap.put(partitionType, records);
+    Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionRecordsMap =
+        Collections.singletonMap(partitionType, records);

Review Comment:
   we can directly do 
   ```
   commit(instantTime, Collections.singletonMap(partitionType, records))
   ```



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkMetadataTableRecordIndex.java:
##########
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.data.HoodiePairData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.data.HoodieJavaPairRDD;
+import org.apache.hudi.data.HoodieJavaRDD;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.exception.TableNotFoundException;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import scala.Tuple2;
+
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
+
+/**
+ * Hoodie Index implementation backed by the record index present in the 
Metadata Table.
+ */
+public class SparkMetadataTableRecordIndex extends HoodieIndex<Object, Object> 
{
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(SparkMetadataTableRecordIndex.class);
+  // The index to fallback upon when record index is not initialized yet.
+  // This should be a global index like record index so that the behavior of 
tagging across partitions is not changed.
+  private static final HoodieIndex.IndexType FALLBACK_INDEX_TYPE = 
IndexType.GLOBAL_SIMPLE;
+
+  public SparkMetadataTableRecordIndex(HoodieWriteConfig config) {
+    super(config);
+  }
+
+  @Override
+  public <R> HoodieData<HoodieRecord<R>> 
tagLocation(HoodieData<HoodieRecord<R>> records, HoodieEngineContext context, 
HoodieTable hoodieTable) throws HoodieIndexException {
+    int fileGroupSize;
+    try {
+      
ValidationUtils.checkState(hoodieTable.getMetaClient().getTableConfig().isMetadataPartitionEnabled(MetadataPartitionType.RECORD_INDEX));
+      fileGroupSize = 
hoodieTable.getMetadataTable().getNumShards(MetadataPartitionType.RECORD_INDEX);
+      ValidationUtils.checkState(fileGroupSize > 0, "Record index should have 
at least one file group");
+    } catch (TableNotFoundException | IllegalStateException e) {
+      // This means that record index has not been initialized.
+      LOG.warn(String.format("Record index not initialized so falling back to 
%s for tagging records", FALLBACK_INDEX_TYPE.name()));
+
+      // Fallback to another index so that tagLocation is still accurate and 
there are no duplicates.
+      HoodieWriteConfig otherConfig = 
HoodieWriteConfig.newBuilder().withProperties(config.getProps())
+          
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(FALLBACK_INDEX_TYPE).build()).build();
+      HoodieIndex fallbackIndex = 
SparkHoodieIndexFactory.createIndex(otherConfig);
+
+      // Fallback index needs to be a global index like record index
+      ValidationUtils.checkArgument(fallbackIndex.isGlobal(), "Fallback index 
needs to be a global index like record index");
+
+      return fallbackIndex.tagLocation(records, context, hoodieTable);
+    }
+
+    // final variable required for lamda functions below
+    final int numFileGroups = fileGroupSize;
+
+    // Partition the record keys to lookup such that each partition looks up 
one record index shard
+    JavaRDD<String> partitionedKeyRDD = HoodieJavaRDD.getJavaRDD(records)
+        .map(HoodieRecord::getRecordKey)
+        .keyBy(k -> HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(k, 
numFileGroups))
+        .partitionBy(new PartitionIdPassthrough(numFileGroups))
+        .map(t -> t._2);
+    ValidationUtils.checkState(partitionedKeyRDD.getNumPartitions() <= 
numFileGroups);
+
+    // Lookup the keys in the record index
+    HoodiePairData<String, HoodieRecordGlobalLocation> keyToLocationPairRDD =
+        HoodieJavaPairRDD.of(partitionedKeyRDD.mapPartitionsToPair(new 
RecordIndexFileGroupLookupFunction(hoodieTable)));
+
+    // Tag the incoming records, as inserts or updates, by joining with 
existing record keys
+    HoodieData<HoodieRecord<R>> taggedRecords = 
tagLocationBackToRecords(keyToLocationPairRDD, records);
+
+    // The number of partitions in the taggedRecords is expected to the 
maximum of the partitions in
+    // keyToLocationPairRDD and records RDD.
+
+    return taggedRecords;
+  }
+
+  @Override
+  public HoodieData<WriteStatus> updateLocation(HoodieData<WriteStatus> 
writeStatuses, HoodieEngineContext context,
+      HoodieTable hoodieTable) {
+    // This is a no-op as metadata record index updates are automatically 
maintained within the metadata table.
+    return writeStatuses;
+  }
+
+  @Override
+  public boolean rollbackCommit(String instantTime) {
+    // Only those deltacommits which have a valid completed commit on the 
dataset are read. Since, the instantTime
+    // is being rolled back on the dataset, we will not load the records from 
the deltacommit and it is virtually
+    // rolled back. In other words, there is no need to rollback any 
deltacommit here except if the deltacommit
+    // was compacted and a new basefile has been created.
+    try {
+      HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
+          
.setBasePath(HoodieTableMetadata.getMetadataTableBasePath(config.getBasePath()))
+          .setConf(new Configuration()).build();
+      HoodieTimeline commitTimeline = 
metaClient.getCommitTimeline().filterCompletedInstants();
+      if (commitTimeline.empty()) {
+        // No compaction yet so no need to check for deltacommits due to the 
logic above
+        return true;
+      }
+
+      if (HoodieTimeline.compareTimestamps(instantTime, GREATER_THAN, 
commitTimeline.lastInstant().get().getTimestamp())) {
+        // After the last compaction so no rollback required as per logic above
+        return true;
+      }
+      LOG.warn("Cannot rollback instant " + instantTime + " because the 
corresponding deltacommit has been compacted "
+          + " in " + commitTimeline.lastInstant().get().getTimestamp());
+      return false;
+    } catch (TableNotFoundException e) {
+      // Metadata table is not setup.  Nothing to rollback.  Exit gracefully.
+      LOG.warn("Cannot rollback instant " + instantTime + " as metadata table 
is not found");
+      return true;
+    }
+  }
+
+  @Override
+  public boolean isGlobal() {
+    return true;
+  }
+
+  @Override
+  public boolean canIndexLogFiles() {
+    return true;

Review Comment:
   are these true? do we consider new inserts added to log files as well? 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -819,9 +917,18 @@ public void buildMetadataPartitions(HoodieEngineContext 
engineContext, List<Hood
    * @param instantTime    Timestamp at which the commit was performed
    */
   @Override
-  public void update(HoodieCommitMetadata commitMetadata, String instantTime) {
-    processAndCommit(instantTime, () -> 
HoodieTableMetadataUtil.convertMetadataToRecords(
-        engineContext, commitMetadata, instantTime, 
getRecordsGenerationParams()));
+  public void update(HoodieCommitMetadata commitMetadata, 
HoodieData<WriteStatus> writeStatus, String instantTime) {
+    processAndCommit(instantTime, () -> {
+      Map<MetadataPartitionType, HoodieData<HoodieRecord>> 
partitionToRecordMap =
+          HoodieTableMetadataUtil.convertMetadataToRecords(engineContext, 
commitMetadata, instantTime, getRecordsGenerationParams());
+
+      // Updates for record index are created by parsing the WriteStatus which 
is a hudi-client object. Hence, we cannot yet move this code
+      // to the HoodieTableMetadataUtil class in hudi-common.
+      if (writeStatus != null && !writeStatus.isEmpty()) {
+        partitionToRecordMap.put(MetadataPartitionType.RECORD_INDEX, 
getRecordIndexUpdates(writeStatus));

Review Comment:
   why can't we move this within convertMetadataToRecords ?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -474,14 +511,80 @@ private Pair<Integer, HoodieData<HoodieRecord>> 
initializeBloomFiltersPartition(
     return Pair.of(fileGroupCount, records);
   }
 
-  private Pair<Integer, HoodieData<HoodieRecord>> 
initializeFilesPartition(String createInstantTime, List<DirectoryInfo> 
partitionInfoList) {
+  private Pair<Integer, HoodieData<HoodieRecord>> 
initializeRecordIndexPartition() throws IOException {
+    final HoodieMetadataFileSystemView fsView = new 
HoodieMetadataFileSystemView(dataMetaClient,

Review Comment:
   already in L416 to 422, we list all partitions and collect file info. 
   can we try reusing them while calling initializeRecordIndexPartition()? For 
eg, partitionBaseFilePairs in L520 is already available at the callers end. 
   we don't need to re-instantiate another FS view again
   
   



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -494,14 +597,8 @@ private Pair<Integer, HoodieData<HoodieRecord>> 
initializeFilesPartition(String
     engineContext.setJobStatus(this.getClass().getSimpleName(), "Creating 
records for MDT FILES partition");
     HoodieData<HoodieRecord> fileListRecords = 
engineContext.parallelize(partitionInfoList, 
partitionInfoList.size()).map(partitionInfo -> {
       Map<String, Long> fileNameToSizeMap = 
partitionInfo.getFileNameToSizeMap();
-      // filter for files that are part of the completed commits
-      Map<String, Long> validFileNameToSizeMap = 
fileNameToSizeMap.entrySet().stream().filter(fileSizePair -> {

Review Comment:
   may I know where have we moved this code? we need to have this filtering. 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -241,105 +221,91 @@ public List<MetadataPartitionType> 
getEnabledPartitionTypes() {
     return this.enabledPartitionTypes;
   }
 
-  /**
-   * Initialize the metadata table if it does not exist.
-   * <p>
-   * If the metadata table does not exist, then file and partition listing is 
used to initialize the table.
-   *
-   * @param engineContext
-   * @param actionMetadata           Action metadata types extending Avro 
generated SpecificRecordBase
-   * @param inflightInstantTimestamp Timestamp of an instant in progress on 
the dataset. This instant is ignored
-   *                                 while deciding to initialize the metadata 
table.
-   */
-  protected abstract <T extends SpecificRecordBase> void 
initialize(HoodieEngineContext engineContext,
-                                                                    Option<T> 
actionMetadata,
-                                                                    
Option<String> inflightInstantTimestamp);
-
-  public void initTableMetadata() {
-    try {
-      if (this.metadata != null) {
-        this.metadata.close();
-      }
-      this.metadata = new HoodieBackedTableMetadata(engineContext, 
dataWriteConfig.getMetadataConfig(),
-          dataWriteConfig.getBasePath(), 
dataWriteConfig.getSpillableMapBasePath());
-      this.metadataMetaClient = metadata.getMetadataMetaClient();
-    } catch (Exception e) {
-      throw new HoodieException("Error initializing metadata table for reads", 
e);
-    }
-  }
-
   /**
    * Initialize the metadata table if needed.
    *
    * @param dataMetaClient           - meta client for the data table
    * @param actionMetadata           - optional action metadata
    * @param inflightInstantTimestamp - timestamp of an instant in progress on 
the dataset
    * @param <T>                      - action metadata types extending Avro 
generated SpecificRecordBase
-   * @throws IOException
+   * @throws IOException on errors
    */
-  protected <T extends SpecificRecordBase> void 
initializeIfNeeded(HoodieTableMetaClient dataMetaClient,
-                                                                   Option<T> 
actionMetadata,
-                                                                   
Option<String> inflightInstantTimestamp) throws IOException {
+  protected <T extends SpecificRecordBase> boolean 
initializeIfNeeded(HoodieTableMetaClient dataMetaClient,
+                                                                      
Option<T> actionMetadata,
+                                                                      
Option<String> inflightInstantTimestamp) throws IOException {
     HoodieTimer timer = HoodieTimer.start();
+    List<MetadataPartitionType> partitionsToInit = new 
ArrayList<>(MetadataPartitionType.values().length);
 
-    boolean exists = metadataTableExists(dataMetaClient, actionMetadata);
+    try {
+      boolean exists = metadataTableExists(dataMetaClient, actionMetadata);
+      if (!exists) {
+        // FILES partition is always required
+        partitionsToInit.add(MetadataPartitionType.FILES);
+      }
 
-    if (!exists) {
-      // Initialize for the first time by listing partitions and files 
directly from the file system
-      if (initializeFromFilesystem(dataMetaClient, inflightInstantTimestamp)) {
-        metrics.ifPresent(m -> 
m.updateMetrics(HoodieMetadataMetrics.INITIALIZE_STR, timer.endTimer()));
+      // check if any of the enabled partition types needs to be initialized
+      // NOTE: It needs to be guarded by async index config because if that is 
enabled then initialization happens through the index scheduler.
+      if (!dataWriteConfig.isMetadataAsyncIndex()) {
+        Set<String> inflightAndCompletedPartitions = 
getInflightAndCompletedMetadataPartitions(dataMetaClient.getTableConfig());
+        LOG.info("Async metadata indexing disabled and following partitions 
already initialized: " + inflightAndCompletedPartitions);
+        this.enabledPartitionTypes.stream()
+                .filter(p -> 
!inflightAndCompletedPartitions.contains(p.getPartitionPath()) && 
!MetadataPartitionType.FILES.equals(p))
+                .forEach(partitionsToInit::add);
       }
-      return;
-    }
 
-    // if metadata table exists, then check if any of the enabled partition 
types needs to be initialized
-    // NOTE: It needs to be guarded by async index config because if that is 
enabled then initialization happens through the index scheduler.
-    if (!dataWriteConfig.isMetadataAsyncIndex()) {
-      Set<String> inflightAndCompletedPartitions = 
getInflightAndCompletedMetadataPartitions(dataMetaClient.getTableConfig());
-      LOG.info("Async metadata indexing enabled and following partitions 
already initialized: " + inflightAndCompletedPartitions);
-      List<MetadataPartitionType> partitionsToInit = 
this.enabledPartitionTypes.stream()
-          .filter(p -> 
!inflightAndCompletedPartitions.contains(p.getPartitionPath()) && 
!MetadataPartitionType.FILES.equals(p))
-          .collect(Collectors.toList());
-      // if there are no partitions to initialize or there is a pending 
operation, then don't initialize in this round
-      if (partitionsToInit.isEmpty() || anyPendingDataInstant(dataMetaClient, 
inflightInstantTimestamp)) {
-        return;
+      if (partitionsToInit.isEmpty()) {
+        // No partitions to initialize
+        initMetadataReader();
+        return true;
+      }
+
+      // If there is no commit on the dataset yet, use the 
SOLO_COMMIT_TIMESTAMP as the instant time for initial commit
+      // Otherwise, we use the timestamp of the latest completed action.
+      String initializationTime = 
dataMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant().map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
+
+      // Initialize partitions for the first time using data from the files on 
the file system
+      if (!initializeFromFilesystem(initializationTime, partitionsToInit, 
inflightInstantTimestamp)) {
+        LOG.error("Failed to initialize MDT from filesystem");
+        return false;
       }
 
-      String createInstantTime = getInitialCommitInstantTime(dataMetaClient);
-      initTableMetadata(); // re-init certain flags in BaseTableMetadata
-      initializeEnabledFileGroups(dataMetaClient, createInstantTime, 
partitionsToInit);
-      initialCommit(createInstantTime, partitionsToInit);
-      updateInitializedPartitionsInTableConfig(partitionsToInit);
+      metrics.ifPresent(m -> 
m.updateMetrics(HoodieMetadataMetrics.INITIALIZE_STR, timer.endTimer()));
+      return true;
+    } catch (IOException e) {
+      LOG.error("Failed to initialize metadata table. Disabling the writer.", 
e);
+      return false;
     }
   }
 
   private <T extends SpecificRecordBase> boolean 
metadataTableExists(HoodieTableMetaClient dataMetaClient,
                                                                      Option<T> 
actionMetadata) throws IOException {
-    boolean exists = dataMetaClient.getFs().exists(new 
Path(metadataWriteConfig.getBasePath(),
-        HoodieTableMetaClient.METAFOLDER_NAME));
+    boolean exists = dataMetaClient.getTableConfig().isMetadataTableEnabled();
     boolean reInitialize = false;
 
     // If the un-synced instants have been archived, then
     // the metadata table will need to be initialized again.
     if (exists) {
-      HoodieTableMetaClient metadataMetaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf.get())
-          .setBasePath(metadataWriteConfig.getBasePath()).build();
-
-      if (DEFAULT_METADATA_POPULATE_META_FIELDS != 
metadataMetaClient.getTableConfig().populateMetaFields()) {
-        LOG.info("Re-initiating metadata table properties since populate meta 
fields have changed");
-        metadataMetaClient = 
initializeMetaClient(DEFAULT_METADATA_POPULATE_META_FIELDS);
+      try {
+        metadataMetaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf.get()).setBasePath(metadataWriteConfig.getBasePath()).build();
+        if (DEFAULT_METADATA_POPULATE_META_FIELDS != 
metadataMetaClient.getTableConfig().populateMetaFields()) {
+          LOG.info("Re-initiating metadata table properties since populate 
meta fields have changed");
+          metadataMetaClient = initializeMetaClient();
+        }
+      } catch (TableNotFoundException e) {
+        // Table not found, initialize the metadata table.
+        metadataMetaClient = initializeMetaClient();

Review Comment:
   +1 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java:
##########
@@ -135,8 +138,13 @@ protected byte[] serializeRecords(List<HoodieRecord> 
records) throws IOException
       }
 
       final byte[] recordBytes = serializeRecord(record, writerSchema);
-      ValidationUtils.checkState(!sortedRecordsMap.containsKey(recordKey),
-          "Writing multiple records with same key not supported for " + 
this.getClass().getName());
+      if (sortedRecordsMap.containsKey(recordKey)) {
+        LOG.error("Found duplicate record with recordKey: " + recordKey);

Review Comment:
   I feel, we should test how hfile write and read works w/ duplicate records. 
If it will throw incase of  duplicate records, why can't we let it through so 
that we don't incur additional latency to check for dups. 



##########
hudi-common/src/main/avro/HoodieMetadata.avsc:
##########
@@ -358,6 +358,46 @@
                 }
             ],
             "default" : null
+        },
+        {
+            "name": "recordIndexMetadata",
+            "doc": "Metadata Index that contains information about record keys 
and their location in the dataset",
+            "type": [
+                "null",
+                 {
+                   "type": "record",
+                   "name": "HoodieRecordIndexInfo",
+                    "fields": [
+                        {
+                            "name": "partition",
+                            "type": "string",
+                            "doc": "Partition which contains the record",
+                            "avro.java.string": "String"

Review Comment:
   for non-partitioned, value for this field will be empty string? 
   bcoz, I see every field here is non-nullable except the recordIndexMetadata 
root field. Just wanted to confirm we are good. 



##########
hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileWriter.java:
##########
@@ -126,6 +131,13 @@ public boolean canWrite() {
 
   @Override
   public void writeAvro(String recordKey, IndexedRecord record) throws 
IOException {
+    if (!this.hfileConfig.allowDuplicatesToBeInserted()) {

Review Comment:
   do we have tests for these? 



##########
hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java:
##########
@@ -222,32 +192,30 @@ public Map<Pair<String, String>, BloomFilter> 
getBloomFilters(final List<Pair<St
     );
 
     List<String> partitionIDFileIDStrings = new 
ArrayList<>(partitionIDFileIDSortedStrings);
-    List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> 
hoodieRecordList =
+    Map<String, HoodieRecord<HoodieMetadataPayload>> hoodieRecords =
         getRecordsByKeys(partitionIDFileIDStrings, 
MetadataPartitionType.BLOOM_FILTERS.getPartitionPath());
     metrics.ifPresent(m -> 
m.updateMetrics(HoodieMetadataMetrics.LOOKUP_BLOOM_FILTERS_METADATA_STR,
         (timer.endTimer() / partitionIDFileIDStrings.size())));
 
-    Map<Pair<String, String>, BloomFilter> partitionFileToBloomFilterMap = new 
HashMap<>();
-    for (final Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>> entry 
: hoodieRecordList) {
-      if (entry.getRight().isPresent()) {
-        final Option<HoodieMetadataBloomFilter> bloomFilterMetadata =
-            entry.getRight().get().getData().getBloomFilterMetadata();
-        if (bloomFilterMetadata.isPresent()) {
-          if (!bloomFilterMetadata.get().getIsDeleted()) {
-            
ValidationUtils.checkState(fileToKeyMap.containsKey(entry.getLeft()));
-            // NOTE: We have to duplicate the [[ByteBuffer]] object here since:
-            //        - Reading out [[ByteBuffer]] mutates its state
-            //        - [[BloomFilterMetadata]] could be re-used, and hence 
have to stay immutable
-            final ByteBuffer bloomFilterByteBuffer =
-                bloomFilterMetadata.get().getBloomFilter().duplicate();
-            final String bloomFilterType = bloomFilterMetadata.get().getType();
-            final BloomFilter bloomFilter = BloomFilterFactory.fromString(
-                
StandardCharsets.UTF_8.decode(bloomFilterByteBuffer).toString(), 
bloomFilterType);
-            
partitionFileToBloomFilterMap.put(fileToKeyMap.get(entry.getLeft()), 
bloomFilter);
-          }
-        } else {
-          LOG.error("Meta index bloom filter missing for: " + 
fileToKeyMap.get(entry.getLeft()));
+    Map<Pair<String, String>, BloomFilter> partitionFileToBloomFilterMap = new 
HashMap<>(hoodieRecords.size());
+    for (final Map.Entry<String, HoodieRecord<HoodieMetadataPayload>> entry : 
hoodieRecords.entrySet()) {

Review Comment:
   ok, looks like we dont do anything if no value is found. so, we should be 
good here. but something to remember 



##########
hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java:
##########
@@ -275,30 +243,51 @@ public Map<Pair<String, String>, 
HoodieMetadataColumnStats> getColumnStats(final
 
     List<String> columnStatKeys = new ArrayList<>(sortedKeys);
     HoodieTimer timer = HoodieTimer.start();
-    List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> 
hoodieRecordList =
+    Map<String, HoodieRecord<HoodieMetadataPayload>> hoodieRecords =

Review Comment:
   we should avoid sorting at L234 to 241. getRecordsByKeys() anyways sorts all 
keys before any look up. 



##########
hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java:
##########
@@ -143,46 +126,33 @@ public List<String> getAllPartitionPaths() throws 
IOException {
    * @param partitionPath The absolute path of the partition to list
    */
   @Override
-  public FileStatus[] getAllFilesInPartition(Path partitionPath)
-      throws IOException {
-    if (isMetadataTableEnabled) {
-      try {
-        return fetchAllFilesInPartition(partitionPath);
-      } catch (Exception e) {
-        throw new HoodieMetadataException("Failed to retrieve files in 
partition " + partitionPath + " from metadata", e);
-      }
+  public FileStatus[] getAllFilesInPartition(Path partitionPath) throws 
IOException {
+    ValidationUtils.checkArgument(isMetadataTableInitialized);
+    try {
+      return fetchAllFilesInPartition(partitionPath);
+    } catch (Exception e) {
+      throw new HoodieMetadataException("Failed to retrieve files in partition 
" + partitionPath + " from metadata", e);
     }
-
-    FileSystemBackedTableMetadata fileSystemBackedTableMetadata =

Review Comment:
   Did we chase done all reader code paths on this. for eg, 
HoodieBackedTableMetadata could be used by readers even when mdt is not enabled 
or fully formed. So, these serve as fallbacks. we should ensure all readers use 
FSBasedTableMetadata if mdt is not enabled before removing these. 



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -193,121 +190,126 @@ public HoodieData<HoodieRecord<HoodieMetadataPayload>> 
getRecordsByKeyPrefixes(L
                   return Collections.emptyIterator();
                 }
 
-              boolean fullKeys = false;
+                boolean fullKeys = false;
 
-              Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> 
logRecords =
-                  readLogRecords(logRecordScanner, sortedKeyPrefixes, 
fullKeys, timings);
+                Map<String, HoodieRecord<HoodieMetadataPayload>> logRecords =
+                    readLogRecords(logRecordScanner, sortedKeyPrefixes, 
fullKeys, timings);
 
-              List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> 
mergedRecords =
-                  readFromBaseAndMergeWithLogRecords(baseFileReader, 
sortedKeyPrefixes, fullKeys, logRecords, timings, partitionName);
+                Map<String, HoodieRecord<HoodieMetadataPayload>> mergedRecords 
=
+                    readFromBaseAndMergeWithLogRecords(baseFileReader, 
sortedKeyPrefixes, fullKeys, logRecords, timings, partitionName);
 
-              LOG.debug(String.format("Metadata read for %s keys took 
[baseFileRead, logMerge] %s ms",
-                  sortedKeyPrefixes.size(), timings));
+                LOG.debug(String.format("Metadata read for %s keys took 
[baseFileRead, logMerge] %s ms",
+                    sortedKeyPrefixes.size(), timings));
 
-              return mergedRecords.stream()
-                .map(keyRecordPair -> keyRecordPair.getValue().orElse(null))
-                .filter(Objects::nonNull)
-                .iterator();
-            } catch (IOException ioe) {
-              throw new HoodieIOException("Error merging records from metadata 
table for  " + sortedKeyPrefixes.size() + " key : ", ioe);
-            } finally {
-              closeReader(readers);
-            }
-          });
+                return mergedRecords.values().iterator();
+              } catch (IOException ioe) {
+                throw new HoodieIOException("Error merging records from 
metadata table for  " + sortedKeyPrefixes.size() + " key : ", ioe);
+              } finally {
+                closeReader(readers);
+              }
+            });
   }
 
   @Override
-  public List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> 
getRecordsByKeys(List<String> keys,
-                                                                               
           String partitionName) {
-    // Sort the columns so that keys are looked up in order
-    List<String> sortedKeys = new ArrayList<>(keys);
-    Collections.sort(sortedKeys);
-    Map<Pair<String, FileSlice>, List<String>> partitionFileSliceToKeysMap = 
getPartitionFileSliceToKeysMapping(partitionName, sortedKeys);
-    List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> result = 
new ArrayList<>();
-    AtomicInteger fileSlicesKeysCount = new AtomicInteger();
-    partitionFileSliceToKeysMap.forEach((partitionFileSlicePair, 
fileSliceKeys) -> {
-      Pair<HoodieSeekingFileReader<?>, HoodieMetadataLogRecordReader> readers =
-          getOrCreateReaders(partitionName, partitionFileSlicePair.getRight());
-      try {
-        List<Long> timings = new ArrayList<>();
-        HoodieSeekingFileReader<?> baseFileReader = readers.getKey();
-        HoodieMetadataLogRecordReader logRecordScanner = readers.getRight();
-        if (baseFileReader == null && logRecordScanner == null) {
-          return;
-        }
+  protected Map<String, HoodieRecord<HoodieMetadataPayload>> 
getRecordsByKeys(List<String> keys, String partitionName) {
+    if (keys.isEmpty()) {
+      return Collections.emptyMap();
+    }
 
-        boolean fullKeys = true;
-        Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> logRecords =
-            readLogRecords(logRecordScanner, fileSliceKeys, fullKeys, timings);
-
-        result.addAll(readFromBaseAndMergeWithLogRecords(baseFileReader, 
fileSliceKeys, fullKeys, logRecords,
-            timings, partitionName));
-
-        LOG.debug(String.format("Metadata read for %s keys took [baseFileRead, 
logMerge] %s ms",
-            fileSliceKeys.size(), timings));
-        fileSlicesKeysCount.addAndGet(fileSliceKeys.size());
-      } catch (IOException ioe) {
-        throw new HoodieIOException("Error merging records from metadata table 
for  " + sortedKeys.size() + " key : ", ioe);
-      } finally {
-        if (!reuse) {
-          closeReader(readers);
-        }
+    Map<String, HoodieRecord<HoodieMetadataPayload>> result;
+

Review Comment:
   I see we were sorting the keys before and now have removed it. Can we keep 
the sorting here. And remove from every other flow. entire metadata is based on 
hfile and it works on sorted entries. so make sense to sort the keys here



##########
hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java:
##########
@@ -222,32 +192,30 @@ public Map<Pair<String, String>, BloomFilter> 
getBloomFilters(final List<Pair<St
     );
 
     List<String> partitionIDFileIDStrings = new 
ArrayList<>(partitionIDFileIDSortedStrings);
-    List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> 
hoodieRecordList =
+    Map<String, HoodieRecord<HoodieMetadataPayload>> hoodieRecords =
         getRecordsByKeys(partitionIDFileIDStrings, 
MetadataPartitionType.BLOOM_FILTERS.getPartitionPath());
     metrics.ifPresent(m -> 
m.updateMetrics(HoodieMetadataMetrics.LOOKUP_BLOOM_FILTERS_METADATA_STR,
         (timer.endTimer() / partitionIDFileIDStrings.size())));
 
-    Map<Pair<String, String>, BloomFilter> partitionFileToBloomFilterMap = new 
HashMap<>();
-    for (final Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>> entry 
: hoodieRecordList) {
-      if (entry.getRight().isPresent()) {
-        final Option<HoodieMetadataBloomFilter> bloomFilterMetadata =
-            entry.getRight().get().getData().getBloomFilterMetadata();
-        if (bloomFilterMetadata.isPresent()) {
-          if (!bloomFilterMetadata.get().getIsDeleted()) {
-            
ValidationUtils.checkState(fileToKeyMap.containsKey(entry.getLeft()));
-            // NOTE: We have to duplicate the [[ByteBuffer]] object here since:
-            //        - Reading out [[ByteBuffer]] mutates its state
-            //        - [[BloomFilterMetadata]] could be re-used, and hence 
have to stay immutable
-            final ByteBuffer bloomFilterByteBuffer =
-                bloomFilterMetadata.get().getBloomFilter().duplicate();
-            final String bloomFilterType = bloomFilterMetadata.get().getType();
-            final BloomFilter bloomFilter = BloomFilterFactory.fromString(
-                
StandardCharsets.UTF_8.decode(bloomFilterByteBuffer).toString(), 
bloomFilterType);
-            
partitionFileToBloomFilterMap.put(fileToKeyMap.get(entry.getLeft()), 
bloomFilter);
-          }
-        } else {
-          LOG.error("Meta index bloom filter missing for: " + 
fileToKeyMap.get(entry.getLeft()));
+    Map<Pair<String, String>, BloomFilter> partitionFileToBloomFilterMap = new 
HashMap<>(hoodieRecords.size());
+    for (final Map.Entry<String, HoodieRecord<HoodieMetadataPayload>> entry : 
hoodieRecords.entrySet()) {

Review Comment:
   we should go through incoming list here i.e. partitionNameFileNameList. 
bcoz, due to the perf fix we made, getRecordsByKeys() will only return valid 
entries. if some key is not found, it may not return any value. so, callers 
have to account for that. 
   in RLI, thats why we do left outer join w/ incoming records



##########
hudi-client/hudi-client-common/src/test/java/org/apache/hudi/common/testutils/HoodieMetadataTestTable.java:
##########
@@ -121,7 +123,7 @@ public HoodieCleanMetadata doClean(String commitTime, 
Map<String, Integer> parti
   public HoodieTestTable addCompaction(String instantTime, 
HoodieCommitMetadata commitMetadata) throws Exception {
     super.addCompaction(instantTime, commitMetadata);
     if (writer != null) {
-      writer.update(commitMetadata, instantTime);
+      writer.update(commitMetadata, 
HoodieListData.eager(Collections.EMPTY_LIST), instantTime);

Review Comment:
   all these needs to be fixed



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java:
##########
@@ -123,6 +124,9 @@ public static HoodieWriteConfig createMetadataWriteConfig(
         .withAllowMultiWriteOnSameInstant(true)
         
.withKeyGenerator(HoodieTableMetadataKeyGenerator.class.getCanonicalName())
         .withPopulateMetaFields(DEFAULT_METADATA_POPULATE_META_FIELDS)
+        .withStorageConfig(HoodieStorageConfig.newBuilder()
+            
.hfileWriterToAllowDuplicates(writeConfig.hfileWriterToAllowDuplicates())

Review Comment:
   curious to understand on which flow we allow duplicates to get written to 
hfile. also, to my knowledge, I don't think our readers (full scan or 
on-demand) can handle duplicates in a hfile. 



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -96,22 +93,24 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
   private final Transient<Map<Pair<String, String>, 
Pair<HoodieSeekingFileReader<?>, HoodieMetadataLogRecordReader>>> 
partitionReaders =
       Transient.lazy(ConcurrentHashMap::new);
 
-  public HoodieBackedTableMetadata(HoodieEngineContext engineContext, 
HoodieMetadataConfig metadataConfig,
-                                   String datasetBasePath, String 
spillableMapDirectory) {
-    this(engineContext, metadataConfig, datasetBasePath, 
spillableMapDirectory, false);
+  // Latest file slices in the metadata partitions
+  private final Map<String, List<FileSlice>> partitionFileSliceMap = new 
ConcurrentHashMap<>();

Review Comment:
   lets ensure we clean this up in reset() and close() . esply reset, if not 
could lead to stale data being served from timeline server. 



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -193,121 +190,126 @@ public HoodieData<HoodieRecord<HoodieMetadataPayload>> 
getRecordsByKeyPrefixes(L
                   return Collections.emptyIterator();
                 }
 
-              boolean fullKeys = false;
+                boolean fullKeys = false;
 
-              Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> 
logRecords =
-                  readLogRecords(logRecordScanner, sortedKeyPrefixes, 
fullKeys, timings);
+                Map<String, HoodieRecord<HoodieMetadataPayload>> logRecords =
+                    readLogRecords(logRecordScanner, sortedKeyPrefixes, 
fullKeys, timings);
 
-              List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> 
mergedRecords =
-                  readFromBaseAndMergeWithLogRecords(baseFileReader, 
sortedKeyPrefixes, fullKeys, logRecords, timings, partitionName);
+                Map<String, HoodieRecord<HoodieMetadataPayload>> mergedRecords 
=
+                    readFromBaseAndMergeWithLogRecords(baseFileReader, 
sortedKeyPrefixes, fullKeys, logRecords, timings, partitionName);
 
-              LOG.debug(String.format("Metadata read for %s keys took 
[baseFileRead, logMerge] %s ms",
-                  sortedKeyPrefixes.size(), timings));
+                LOG.debug(String.format("Metadata read for %s keys took 
[baseFileRead, logMerge] %s ms",
+                    sortedKeyPrefixes.size(), timings));
 
-              return mergedRecords.stream()
-                .map(keyRecordPair -> keyRecordPair.getValue().orElse(null))
-                .filter(Objects::nonNull)
-                .iterator();
-            } catch (IOException ioe) {
-              throw new HoodieIOException("Error merging records from metadata 
table for  " + sortedKeyPrefixes.size() + " key : ", ioe);
-            } finally {
-              closeReader(readers);
-            }
-          });
+                return mergedRecords.values().iterator();
+              } catch (IOException ioe) {
+                throw new HoodieIOException("Error merging records from 
metadata table for  " + sortedKeyPrefixes.size() + " key : ", ioe);
+              } finally {
+                closeReader(readers);
+              }
+            });
   }
 
   @Override
-  public List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> 
getRecordsByKeys(List<String> keys,
-                                                                               
           String partitionName) {
-    // Sort the columns so that keys are looked up in order
-    List<String> sortedKeys = new ArrayList<>(keys);
-    Collections.sort(sortedKeys);
-    Map<Pair<String, FileSlice>, List<String>> partitionFileSliceToKeysMap = 
getPartitionFileSliceToKeysMapping(partitionName, sortedKeys);
-    List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> result = 
new ArrayList<>();
-    AtomicInteger fileSlicesKeysCount = new AtomicInteger();
-    partitionFileSliceToKeysMap.forEach((partitionFileSlicePair, 
fileSliceKeys) -> {
-      Pair<HoodieSeekingFileReader<?>, HoodieMetadataLogRecordReader> readers =
-          getOrCreateReaders(partitionName, partitionFileSlicePair.getRight());
-      try {
-        List<Long> timings = new ArrayList<>();
-        HoodieSeekingFileReader<?> baseFileReader = readers.getKey();
-        HoodieMetadataLogRecordReader logRecordScanner = readers.getRight();
-        if (baseFileReader == null && logRecordScanner == null) {
-          return;
-        }
+  protected Map<String, HoodieRecord<HoodieMetadataPayload>> 
getRecordsByKeys(List<String> keys, String partitionName) {
+    if (keys.isEmpty()) {
+      return Collections.emptyMap();
+    }
 
-        boolean fullKeys = true;
-        Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> logRecords =
-            readLogRecords(logRecordScanner, fileSliceKeys, fullKeys, timings);
-
-        result.addAll(readFromBaseAndMergeWithLogRecords(baseFileReader, 
fileSliceKeys, fullKeys, logRecords,
-            timings, partitionName));
-
-        LOG.debug(String.format("Metadata read for %s keys took [baseFileRead, 
logMerge] %s ms",
-            fileSliceKeys.size(), timings));
-        fileSlicesKeysCount.addAndGet(fileSliceKeys.size());
-      } catch (IOException ioe) {
-        throw new HoodieIOException("Error merging records from metadata table 
for  " + sortedKeys.size() + " key : ", ioe);
-      } finally {
-        if (!reuse) {
-          closeReader(readers);
-        }
+    Map<String, HoodieRecord<HoodieMetadataPayload>> result;
+
+    // Load the file slices for the partition. Each file slice is a shard 
which saves a portion of the keys.
+    List<FileSlice> partitionFileSlices = 
partitionFileSliceMap.computeIfAbsent(partitionName,
+        k -> 
HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient, 
metadataFileSystemView, partitionName));
+    final int numFileSlices = partitionFileSlices.size();
+    ValidationUtils.checkState(numFileSlices > 0, "Number of file slices for 
partition " + partitionName + " should be > 0");
+
+    // Lookup keys from each file slice
+    if (numFileSlices == 1) {
+      // Optimization for a single slice for smaller metadata table partitions
+      result = lookupKeysFromFileSlice(partitionName, keys, 
partitionFileSlices.get(0));
+    } else {
+      // Parallel lookup for large sized partitions with many file slices
+      // Partition the keys by the file slice which contains it
+      ArrayList<ArrayList<String>> partitionedKeys = new 
ArrayList<>(numFileSlices);
+      for (int i = 0; i < numFileSlices; ++i) {
+        partitionedKeys.add(new ArrayList<>());
       }
-    });
+      keys.forEach(key -> {
+        int shardIndex = 
HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(key, numFileSlices);

Review Comment:
   not in this patch. but already the driver (in case of RLI based index) maps 
keys to file groups based on hash and calls mapPartitions. So, if we pass on 
the file group id along w/ getRecordsByKeys(), we can avoid this additional 
mapping calls. atleast w/ RLI, each spark task is going to look up only in one 
file slice. 



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java:
##########
@@ -296,23 +324,6 @@ public static HoodieRecord<HoodieMetadataPayload> 
createPartitionListRecord(List
     return new HoodieAvroRecord<>(key, payload);
   }
 
-  /**
-   * Create and return a {@code HoodieMetadataPayload} to save list of 
partitions.
-   *
-   * @param partitionsAdded   The list of added partitions
-   * @param partitionsDeleted The list of deleted partitions
-   */
-  public static HoodieRecord<HoodieMetadataPayload> 
createPartitionListRecord(List<String> partitionsAdded, List<String> 
partitionsDeleted) {

Review Comment:
   was this never used at all ?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -241,105 +221,91 @@ public List<MetadataPartitionType> 
getEnabledPartitionTypes() {
     return this.enabledPartitionTypes;
   }
 
-  /**
-   * Initialize the metadata table if it does not exist.
-   * <p>
-   * If the metadata table does not exist, then file and partition listing is 
used to initialize the table.
-   *
-   * @param engineContext
-   * @param actionMetadata           Action metadata types extending Avro 
generated SpecificRecordBase
-   * @param inflightInstantTimestamp Timestamp of an instant in progress on 
the dataset. This instant is ignored
-   *                                 while deciding to initialize the metadata 
table.
-   */
-  protected abstract <T extends SpecificRecordBase> void 
initialize(HoodieEngineContext engineContext,
-                                                                    Option<T> 
actionMetadata,
-                                                                    
Option<String> inflightInstantTimestamp);
-
-  public void initTableMetadata() {
-    try {
-      if (this.metadata != null) {
-        this.metadata.close();
-      }
-      this.metadata = new HoodieBackedTableMetadata(engineContext, 
dataWriteConfig.getMetadataConfig(),
-          dataWriteConfig.getBasePath(), 
dataWriteConfig.getSpillableMapBasePath());
-      this.metadataMetaClient = metadata.getMetadataMetaClient();
-    } catch (Exception e) {
-      throw new HoodieException("Error initializing metadata table for reads", 
e);
-    }
-  }
-
   /**
    * Initialize the metadata table if needed.
    *
    * @param dataMetaClient           - meta client for the data table
    * @param actionMetadata           - optional action metadata
    * @param inflightInstantTimestamp - timestamp of an instant in progress on 
the dataset
    * @param <T>                      - action metadata types extending Avro 
generated SpecificRecordBase
-   * @throws IOException
+   * @throws IOException on errors
    */
-  protected <T extends SpecificRecordBase> void 
initializeIfNeeded(HoodieTableMetaClient dataMetaClient,
-                                                                   Option<T> 
actionMetadata,
-                                                                   
Option<String> inflightInstantTimestamp) throws IOException {
+  protected <T extends SpecificRecordBase> boolean 
initializeIfNeeded(HoodieTableMetaClient dataMetaClient,
+                                                                      
Option<T> actionMetadata,
+                                                                      
Option<String> inflightInstantTimestamp) throws IOException {
     HoodieTimer timer = HoodieTimer.start();
+    List<MetadataPartitionType> partitionsToInit = new 
ArrayList<>(MetadataPartitionType.values().length);
 
-    boolean exists = metadataTableExists(dataMetaClient, actionMetadata);
+    try {
+      boolean exists = metadataTableExists(dataMetaClient, actionMetadata);
+      if (!exists) {
+        // FILES partition is always required
+        partitionsToInit.add(MetadataPartitionType.FILES);
+      }
 
-    if (!exists) {
-      // Initialize for the first time by listing partitions and files 
directly from the file system
-      if (initializeFromFilesystem(dataMetaClient, inflightInstantTimestamp)) {
-        metrics.ifPresent(m -> 
m.updateMetrics(HoodieMetadataMetrics.INITIALIZE_STR, timer.endTimer()));
+      // check if any of the enabled partition types needs to be initialized
+      // NOTE: It needs to be guarded by async index config because if that is 
enabled then initialization happens through the index scheduler.
+      if (!dataWriteConfig.isMetadataAsyncIndex()) {
+        Set<String> inflightAndCompletedPartitions = 
getInflightAndCompletedMetadataPartitions(dataMetaClient.getTableConfig());
+        LOG.info("Async metadata indexing disabled and following partitions 
already initialized: " + inflightAndCompletedPartitions);
+        this.enabledPartitionTypes.stream()
+                .filter(p -> 
!inflightAndCompletedPartitions.contains(p.getPartitionPath()) && 
!MetadataPartitionType.FILES.equals(p))
+                .forEach(partitionsToInit::add);
       }
-      return;
-    }
 
-    // if metadata table exists, then check if any of the enabled partition 
types needs to be initialized
-    // NOTE: It needs to be guarded by async index config because if that is 
enabled then initialization happens through the index scheduler.
-    if (!dataWriteConfig.isMetadataAsyncIndex()) {
-      Set<String> inflightAndCompletedPartitions = 
getInflightAndCompletedMetadataPartitions(dataMetaClient.getTableConfig());
-      LOG.info("Async metadata indexing enabled and following partitions 
already initialized: " + inflightAndCompletedPartitions);
-      List<MetadataPartitionType> partitionsToInit = 
this.enabledPartitionTypes.stream()
-          .filter(p -> 
!inflightAndCompletedPartitions.contains(p.getPartitionPath()) && 
!MetadataPartitionType.FILES.equals(p))
-          .collect(Collectors.toList());
-      // if there are no partitions to initialize or there is a pending 
operation, then don't initialize in this round
-      if (partitionsToInit.isEmpty() || anyPendingDataInstant(dataMetaClient, 
inflightInstantTimestamp)) {
-        return;
+      if (partitionsToInit.isEmpty()) {
+        // No partitions to initialize
+        initMetadataReader();
+        return true;

Review Comment:
   guess it has to be true.



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java:
##########
@@ -224,4 +232,9 @@ HoodieData<HoodieRecord<HoodieMetadataPayload>> 
getRecordsByKeyPrefixes(List<Str
    * Clear the states of the table metadata.
    */
   void reset();
+
+  /**
+   * Returns the number of shards in a metadata table partition.
+   */
+  int getNumShards(MetadataPartitionType partition);

Review Comment:
   shards is a new terminology for hudi in general. For an engineer who comes 
from data base or data infra background might knew what a shard is. But can we 
use "getNumFileGroups" so that its in line with our configs like min file 
groups, max file groups etc. 
   



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1000,87 +1136,78 @@ protected void cleanIfNecessary(BaseHoodieWriteClient 
writeClient, String instan
     // Trigger cleaning with suffixes based on the same instant time. This 
ensures that any future
     // delta commits synced over will not have an instant time lesser than the 
last completed instant on the
     // metadata table.
-    writeClient.clean(instantTime + "002");
+    
writeClient.clean(HoodieTableMetadataUtil.createCleanTimestamp(instantTime));
     writeClient.lazyRollbackFailedIndexing();
   }
 
   /**
-   * This is invoked to initialize metadata table for a dataset.
-   * Initial commit has special handling mechanism due to its scale compared 
to other regular commits.
-   * During cold startup, the list of files to be committed can be huge.
-   * So creating a HoodieCommitMetadata out of these large number of files,
-   * and calling the existing update(HoodieCommitMetadata) function does not 
scale well.
-   * Hence, we have a special commit just for the initialization scenario.
+   * Validates the timeline for both main and metadata tables to ensure 
compaction on MDT can be scheduled.
    */
-  private void initialCommit(String createInstantTime, 
List<MetadataPartitionType> partitionTypes) {
-    // List all partitions in the basePath of the containing dataset
-    LOG.info("Initializing metadata table by using file listings in " + 
dataWriteConfig.getBasePath());
-    engineContext.setJobStatus(this.getClass().getSimpleName(), "Initializing 
metadata table by listing files and partitions: " + 
dataWriteConfig.getTableName());
-
-    Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionToRecordsMap 
= new HashMap<>();
-
-    // skip file system listing to populate metadata records if it's a fresh 
table.
-    // this is applicable only if the table already has N commits and metadata 
is enabled at a later point in time.
-    if (createInstantTime.equals(SOLO_COMMIT_TIMESTAMP)) { // 
SOLO_COMMIT_TIMESTAMP will be the initial commit time in MDT for a fresh table.
-      // If not, last completed commit in data table will be chosen as the 
initial commit time.
-      LOG.info("Triggering empty Commit to metadata to initialize");
-    } else {
-      List<DirectoryInfo> partitionInfoList = 
listAllPartitions(dataMetaClient);
-      Map<String, Map<String, Long>> partitionToFilesMap = 
partitionInfoList.stream()
-          .map(p -> {
-            String partitionName = 
HoodieTableMetadataUtil.getPartitionIdentifier(p.getRelativePath());
-            return Pair.of(partitionName, p.getFileNameToSizeMap());
-          })
-          .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
-
-      int totalDataFilesCount = 
partitionToFilesMap.values().stream().mapToInt(Map::size).sum();
-      List<String> partitions = new ArrayList<>(partitionToFilesMap.keySet());
-
-      if (partitionTypes.contains(MetadataPartitionType.FILES)) {
-        // Record which saves the list of all partitions
-        HoodieRecord allPartitionRecord = 
HoodieMetadataPayload.createPartitionListRecord(partitions);
-        HoodieData<HoodieRecord> filesPartitionRecords = 
getFilesPartitionRecords(createInstantTime, partitionInfoList, 
allPartitionRecord);
-        ValidationUtils.checkState(filesPartitionRecords.count() == 
(partitions.size() + 1));
-        partitionToRecordsMap.put(MetadataPartitionType.FILES, 
filesPartitionRecords);
-      }
-
-      if (partitionTypes.contains(MetadataPartitionType.BLOOM_FILTERS) && 
totalDataFilesCount > 0) {
-        final HoodieData<HoodieRecord> recordsRDD = 
HoodieTableMetadataUtil.convertFilesToBloomFilterRecords(
-            engineContext, Collections.emptyMap(), partitionToFilesMap, 
getRecordsGenerationParams(), createInstantTime);
-        partitionToRecordsMap.put(MetadataPartitionType.BLOOM_FILTERS, 
recordsRDD);
-      }
+  private boolean validateTimelineBeforeSchedulingCompaction(Option<String> 
inFlightInstantTimestamp, String latestDeltaCommitTimeInMetadataTable) {
+    // we need to find if there are any inflights in data table timeline 
before or equal to the latest delta commit in metadata table.
+    // Whenever you want to change this logic, please ensure all below 
scenarios are considered.
+    // a. There could be a chance that latest delta commit in MDT is committed 
in MDT, but failed in DT. And so findInstantsBeforeOrEquals() should be employed
+    // b. There could be DT inflights after latest delta commit in MDT and we 
are ok with it. bcoz, the contract is, latest compaction instant time in MDT 
represents
+    // any instants before that is already synced with metadata table.
+    // c. Do consider out of order commits. For eg, c4 from DT could complete 
before c3. and we can't trigger compaction in MDT with c4 as base instant time, 
until every
+    // instant before c4 is synced with metadata table.
+    List<HoodieInstant> pendingInstants = 
dataMetaClient.reloadActiveTimeline().filterInflightsAndRequested()
+        
.findInstantsBeforeOrEquals(latestDeltaCommitTimeInMetadataTable).getInstants();
 
-      if (partitionTypes.contains(MetadataPartitionType.COLUMN_STATS) && 
totalDataFilesCount > 0) {
-        final HoodieData<HoodieRecord> recordsRDD = 
HoodieTableMetadataUtil.convertFilesToColumnStatsRecords(
-            engineContext, Collections.emptyMap(), partitionToFilesMap, 
getRecordsGenerationParams());
-        partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS, 
recordsRDD);
-      }
-      LOG.info("Committing " + partitions.size() + " partitions and " + 
totalDataFilesCount + " files to metadata");
+    if (!pendingInstants.isEmpty()) {
+      checkNumDeltaCommits(metadataMetaClient, 
dataWriteConfig.getMetadataConfig().getMaxNumDeltacommitsWhenPending());
+      LOG.info(String.format(
+          "Cannot compact metadata table as there are %d inflight instants in 
data table before latest deltacommit in metadata table: %s. Inflight instants 
in data table: %s",
+          pendingInstants.size(), latestDeltaCommitTimeInMetadataTable, 
Arrays.toString(pendingInstants.toArray())));
+      return false;
     }
 
-    commit(createInstantTime, partitionToRecordsMap, false);
+    return true;
   }
 
-  private HoodieData<HoodieRecord> getFilesPartitionRecords(String 
createInstantTime, List<DirectoryInfo> partitionInfoList, HoodieRecord 
allPartitionRecord) {
-    HoodieData<HoodieRecord> filesPartitionRecords = 
engineContext.parallelize(Arrays.asList(allPartitionRecord), 1);
-    if (partitionInfoList.isEmpty()) {
-      return filesPartitionRecords;
-    }
+  /**
+   * Return records that represent update to the record index due to write 
operation on the dataset.
+   *
+   * @param writeStatuses (@code WriteStatus} from the write operation
+   */
+  private HoodieData<HoodieRecord> 
getRecordIndexUpdates(HoodieData<WriteStatus> writeStatuses) {
+    return writeStatuses.flatMap(writeStatus -> {
+      List<HoodieRecord> recordList = new LinkedList<>();
+      for (HoodieRecord writtenRecord : writeStatus.getWrittenRecords()) {
+        if (!writeStatus.isErrored(writtenRecord.getKey())) {
+          HoodieRecord hoodieRecord;
+          HoodieKey key = writtenRecord.getKey();
+          Option<HoodieRecordLocation> newLocation = 
writtenRecord.getNewLocation();
+          if (newLocation.isPresent()) {
+            if (writtenRecord.getCurrentLocation() != null) {
+              // This is an update, no need to update index if the location 
has not changed
+              // newLocation should have the same fileID as currentLocation. 
The instantTimes differ as newLocation's
+              // instantTime refers to the current commit which was completed.
+              if 
(!writtenRecord.getCurrentLocation().getFileId().equals(newLocation.get().getFileId()))
 {
+                final String msg = String.format("Detected update in location 
of record with key %s from %s "

Review Comment:
   we can take it up in a follow up patch



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -1620,4 +1620,72 @@ public static String createIndexInitTimestamp(String 
timestamp, int offset) {
   public static String createLogCompactionTimestamp(String timestamp) {
     return timestamp + LOG_COMPACTION_TIMESTAMP_SUFFIX;
   }
+
+  /**
+   * Estimates the file group count to use for a MDT partition.
+   *
+   * @param partitionType         Type of the partition for which the file 
group count is to be estimated.
+   * @param recordCount           The number of records expected to be written.
+   * @param averageRecordSize     Average size of each record to be writen.
+   * @param minFileGroupCount     Minimum number of file groups to use.
+   * @param maxFileGroupCount     Maximum number of file groups to use.
+   * @param growthFactor          By what factor are the records (recordCount) 
expected to grow?
+   * @param maxFileGroupSizeBytes Maximum size of the file group.
+   * @return The estimated number of file groups.
+   */
+  public static int estimateFileGroupCount(MetadataPartitionType 
partitionType, long recordCount, int averageRecordSize, int minFileGroupCount,

Review Comment:
   do we have tests for this? 



##########
hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileReader.java:
##########
@@ -545,6 +545,33 @@ public void close() {
     }
   }
 
+  @Override
+  public ClosableIterator<String> getRecordKeyIterator() {
+    final HFileScanner scanner = reader.getScanner(false, false);

Review Comment:
   NTS: this is not used for metadata table read. 
   we use getRecordsByKeysIterator() 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -111,18 +111,27 @@ public abstract class HoodieBackedTableMetadataWriter 
implements HoodieTableMeta
 
   public static final String METADATA_COMPACTION_TIME_SUFFIX = "001";
 
+  // Virtual keys support for metadata table. This Field is
+  // from the metadata payload schema.
+  private static final String RECORD_KEY_FIELD_NAME = 
HoodieMetadataPayload.KEY_FIELD_NAME;
+
+  // Average size of a record saved within the record index.
+  // Record index has a fixed size schema. This has been calculated based on 
experiments with default settings
+  // for block size (4MB), compression (GZ) and disabling the hudi metadata 
fields.

Review Comment:
   is this comment addressed ?



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkMetadataTableRecordIndex.java:
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.data.HoodiePairData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.data.HoodieJavaPairRDD;
+import org.apache.hudi.data.HoodieJavaRDD;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.exception.TableNotFoundException;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+import org.apache.spark.sql.execution.PartitionIdPassthrough;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
+
+/**
+ * Hoodie Index implementation backed by the record index present in the 
Metadata Table.
+ */
+public class SparkMetadataTableRecordIndex extends HoodieIndex<Object, Object> 
{
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(SparkMetadataTableRecordIndex.class);
+  // The index to fallback upon when record index is not initialized yet.
+  // This should be a global index like record index so that the behavior of 
tagging across partitions is not changed.
+  private static final HoodieIndex.IndexType FALLBACK_INDEX_TYPE = 
IndexType.GLOBAL_SIMPLE;
+
+  public SparkMetadataTableRecordIndex(HoodieWriteConfig config) {
+    super(config);
+  }
+
+  @Override
+  public <R> HoodieData<HoodieRecord<R>> 
tagLocation(HoodieData<HoodieRecord<R>> records, HoodieEngineContext context, 
HoodieTable hoodieTable) throws HoodieIndexException {
+    int fileGroupSize;
+    try {
+      
ValidationUtils.checkState(hoodieTable.getMetaClient().getTableConfig().isMetadataPartitionEnabled(MetadataPartitionType.RECORD_INDEX));
+      fileGroupSize = 
HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(hoodieTable.getMetaClient(),
 (HoodieTableFileSystemView) hoodieTable.getFileSystemView(),
+          MetadataPartitionType.RECORD_INDEX.getPartitionPath()).size();
+      ValidationUtils.checkState(fileGroupSize > 0, "Record index should have 
at least one file group");
+    } catch (TableNotFoundException | IllegalStateException e) {
+      // This means that record index has not been initialized.
+      LOG.warn(String.format("Record index not initialized so falling back to 
%s for tagging records", FALLBACK_INDEX_TYPE.name()));
+
+      // Fallback to another index so that tagLocation is still accurate and 
there are no duplicates.
+      HoodieWriteConfig otherConfig = 
HoodieWriteConfig.newBuilder().withProperties(config.getProps())
+          
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(FALLBACK_INDEX_TYPE).build()).build();
+      HoodieIndex fallbackIndex = 
SparkHoodieIndexFactory.createIndex(otherConfig);
+
+      // Fallback index needs to be a global index like record index
+      ValidationUtils.checkArgument(fallbackIndex.isGlobal(), "Fallback index 
needs to be a global index like record index");
+
+      return fallbackIndex.tagLocation(records, context, hoodieTable);
+    }
+
+    // final variable required for lamda functions below
+    final int numFileGroups = fileGroupSize;
+
+    // Partition the record keys to lookup such that each partition looks up 
one record index shard
+    JavaRDD<String> partitionedKeyRDD = HoodieJavaRDD.getJavaRDD(records)
+        .map(HoodieRecord::getRecordKey)
+        .keyBy(k -> HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(k, 
numFileGroups))
+        .partitionBy(new PartitionIdPassthrough(numFileGroups))
+        .map(t -> t._2);
+    ValidationUtils.checkState(partitionedKeyRDD.getNumPartitions() <= 
numFileGroups);
+
+    // Lookup the keys in the record index
+    HoodiePairData<String, HoodieRecordGlobalLocation> keyToLocationPairRDD =
+        HoodieJavaPairRDD.of(partitionedKeyRDD.mapPartitionsToPair(new 
RecordIndexFileGroupLookupFunction(hoodieTable)));
+
+    // Tag the incoming records, as inserts or updates, by joining with 
existing record keys
+    HoodieData<HoodieRecord<R>> taggedRecords = 
tagLocationBackToRecords(keyToLocationPairRDD, records);
+
+    // The number of partitions in the taggedRecords is expected to the 
maximum of the partitions in
+    // keyToLocationPairRDD and records RDD.
+
+    return taggedRecords;
+  }
+
+  @Override
+  public HoodieData<WriteStatus> updateLocation(HoodieData<WriteStatus> 
writeStatuses, HoodieEngineContext context,
+      HoodieTable hoodieTable) {
+    // This is a no-op as metadata record index updates are automatically 
maintained within the metadata table.
+    return writeStatuses;
+  }
+
+  @Override
+  public boolean rollbackCommit(String instantTime) {
+    // Only those deltacommits which have a valid completed commit on the 
dataset are read. Since, the instantTime
+    // is being rolled back on the dataset, we will not load the records from 
the deltacommit and it is virtually
+    // rolled back. In other words, there is no need to rollback any 
deltacommit here except if the deltacommit
+    // was compacted and a new basefile has been created.
+    try {
+      HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
+          
.setBasePath(HoodieTableMetadata.getMetadataTableBasePath(config.getBasePath()))
+          .setConf(new Configuration()).build();
+      HoodieTimeline commitTimeline = 
metaClient.getCommitTimeline().filterCompletedInstants();
+      if (commitTimeline.empty()) {
+        // No compaction yet so no need to check for deltacommits due to the 
logic above
+        return true;
+      }
+
+      if (HoodieTimeline.compareTimestamps(instantTime, GREATER_THAN, 
commitTimeline.lastInstant().get().getTimestamp())) {
+        // After the last compaction so no rollback required as per logic above
+        return true;
+      }
+      LOG.warn("Cannot rollback instant " + instantTime + " because the 
corresponding deltacommit has been compacted "
+          + " in " + commitTimeline.lastInstant().get().getTimestamp());
+      return false;
+    } catch (TableNotFoundException e) {
+      // Metadata table is not setup.  Nothing to rollback.  Exit gracefully.
+      LOG.warn("Cannot rollback instant " + instantTime + " as metadata table 
is not found");
+      return true;
+    }
+  }
+
+  @Override
+  public boolean isGlobal() {
+    return true;
+  }
+
+  @Override
+  public boolean canIndexLogFiles() {
+    return true;
+  }
+
+  @Override
+  public boolean isImplicitWithStorage() {
+    return false;
+  }
+
+  private <R> HoodieData<HoodieRecord<R>> tagLocationBackToRecords(
+      HoodiePairData<String, HoodieRecordGlobalLocation> keyFilenamePair,
+      HoodieData<HoodieRecord<R>> records) {
+    HoodiePairData<String, HoodieRecord<R>> keyRecordPairs =
+        records.mapToPair(record -> new ImmutablePair<>(record.getRecordKey(), 
record));
+    // Here as the records might have more data than keyFilenamePairs (some 
row keys' not found in record index),
+    // we will do left outer join.
+    return keyRecordPairs.leftOuterJoin(keyFilenamePair).values()
+        .map(v -> {
+          HoodieRecord<R> record = v.getLeft();
+          Option<HoodieRecordGlobalLocation> location = 
Option.ofNullable(v.getRight().orElse(null));
+          if (!location.isPresent()) {
+            // No location found.
+            return record;
+          }
+          // Ensure the partitionPath is also set correctly in the key
+          if 
(!record.getPartitionPath().equals(location.get().getPartitionPath())) {
+            record = new HoodieAvroRecord(new HoodieKey(record.getRecordKey(), 
location.get().getPartitionPath()), (HoodieRecordPayload) record.getData());
+          }
+
+          // Perform the tagging. Not using HoodieIndexUtils.getTaggedRecord 
to prevent an additional copy which is not necessary for this index.
+          record.unseal();
+          record.setCurrentLocation(location.get());
+          record.seal();
+          return record;
+        });
+  }
+
+  /**
+   * Function that lookups a list of keys in a single shard of the record index
+   */
+  private static class RecordIndexFileGroupLookupFunction implements 
PairFlatMapFunction<Iterator<String>, String, HoodieRecordGlobalLocation> {
+    private final HoodieTable hoodieTable;
+
+    public RecordIndexFileGroupLookupFunction(HoodieTable hoodieTable) {
+      this.hoodieTable = hoodieTable;
+    }
+
+    @Override
+    public Iterator<Tuple2<String, HoodieRecordGlobalLocation>> 
call(Iterator<String> recordKeyIterator) {
+      List<String> keysToLookup = new ArrayList<>();
+      recordKeyIterator.forEachRemaining(keysToLookup::add);
+
+      // recordIndexInfo object only contains records that are present in 
record_index.
+      Map<String, HoodieRecordGlobalLocation> recordIndexInfo = 
hoodieTable.getMetadataTable().readRecordIndex(keysToLookup);
+
+      HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
+      HoodieTimeline commitsTimeline = 
metaClient.getCommitsTimeline().filterCompletedInstants();
+      return recordIndexInfo.entrySet().stream()
+          .filter(e -> HoodieIndexUtils.checkIfValidCommit(commitsTimeline, 
e.getValue().getInstantTime()))

Review Comment:
   checking if valid commit might be costly since we are doing it for every 
record. so, trying to see if we can avoid it



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkMetadataTableRecordIndex.java:
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.data.HoodiePairData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.data.HoodieJavaPairRDD;
+import org.apache.hudi.data.HoodieJavaRDD;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.exception.TableNotFoundException;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+import org.apache.spark.sql.execution.PartitionIdPassthrough;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
+
+/**
+ * Hoodie Index implementation backed by the record index present in the 
Metadata Table.
+ */
+public class SparkMetadataTableRecordIndex extends HoodieIndex<Object, Object> 
{
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(SparkMetadataTableRecordIndex.class);
+  // The index to fallback upon when record index is not initialized yet.
+  // This should be a global index like record index so that the behavior of 
tagging across partitions is not changed.
+  private static final HoodieIndex.IndexType FALLBACK_INDEX_TYPE = 
IndexType.GLOBAL_SIMPLE;
+
+  public SparkMetadataTableRecordIndex(HoodieWriteConfig config) {
+    super(config);
+  }
+
+  @Override
+  public <R> HoodieData<HoodieRecord<R>> 
tagLocation(HoodieData<HoodieRecord<R>> records, HoodieEngineContext context, 
HoodieTable hoodieTable) throws HoodieIndexException {
+    int fileGroupSize;
+    try {
+      
ValidationUtils.checkState(hoodieTable.getMetaClient().getTableConfig().isMetadataPartitionEnabled(MetadataPartitionType.RECORD_INDEX));
+      fileGroupSize = 
HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(hoodieTable.getMetaClient(),
 (HoodieTableFileSystemView) hoodieTable.getFileSystemView(),
+          MetadataPartitionType.RECORD_INDEX.getPartitionPath()).size();
+      ValidationUtils.checkState(fileGroupSize > 0, "Record index should have 
at least one file group");
+    } catch (TableNotFoundException | IllegalStateException e) {
+      // This means that record index has not been initialized.
+      LOG.warn(String.format("Record index not initialized so falling back to 
%s for tagging records", FALLBACK_INDEX_TYPE.name()));
+
+      // Fallback to another index so that tagLocation is still accurate and 
there are no duplicates.
+      HoodieWriteConfig otherConfig = 
HoodieWriteConfig.newBuilder().withProperties(config.getProps())
+          
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(FALLBACK_INDEX_TYPE).build()).build();
+      HoodieIndex fallbackIndex = 
SparkHoodieIndexFactory.createIndex(otherConfig);
+
+      // Fallback index needs to be a global index like record index
+      ValidationUtils.checkArgument(fallbackIndex.isGlobal(), "Fallback index 
needs to be a global index like record index");
+
+      return fallbackIndex.tagLocation(records, context, hoodieTable);
+    }
+
+    // final variable required for lamda functions below
+    final int numFileGroups = fileGroupSize;
+
+    // Partition the record keys to lookup such that each partition looks up 
one record index shard
+    JavaRDD<String> partitionedKeyRDD = HoodieJavaRDD.getJavaRDD(records)
+        .map(HoodieRecord::getRecordKey)
+        .keyBy(k -> HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(k, 
numFileGroups))
+        .partitionBy(new PartitionIdPassthrough(numFileGroups))
+        .map(t -> t._2);
+    ValidationUtils.checkState(partitionedKeyRDD.getNumPartitions() <= 
numFileGroups);
+
+    // Lookup the keys in the record index
+    HoodiePairData<String, HoodieRecordGlobalLocation> keyToLocationPairRDD =
+        HoodieJavaPairRDD.of(partitionedKeyRDD.mapPartitionsToPair(new 
RecordIndexFileGroupLookupFunction(hoodieTable)));
+
+    // Tag the incoming records, as inserts or updates, by joining with 
existing record keys
+    HoodieData<HoodieRecord<R>> taggedRecords = 
tagLocationBackToRecords(keyToLocationPairRDD, records);
+
+    // The number of partitions in the taggedRecords is expected to the 
maximum of the partitions in
+    // keyToLocationPairRDD and records RDD.
+
+    return taggedRecords;
+  }
+
+  @Override
+  public HoodieData<WriteStatus> updateLocation(HoodieData<WriteStatus> 
writeStatuses, HoodieEngineContext context,
+      HoodieTable hoodieTable) {
+    // This is a no-op as metadata record index updates are automatically 
maintained within the metadata table.
+    return writeStatuses;
+  }
+
+  @Override
+  public boolean rollbackCommit(String instantTime) {
+    // Only those deltacommits which have a valid completed commit on the 
dataset are read. Since, the instantTime
+    // is being rolled back on the dataset, we will not load the records from 
the deltacommit and it is virtually
+    // rolled back. In other words, there is no need to rollback any 
deltacommit here except if the deltacommit
+    // was compacted and a new basefile has been created.
+    try {
+      HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
+          
.setBasePath(HoodieTableMetadata.getMetadataTableBasePath(config.getBasePath()))
+          .setConf(new Configuration()).build();
+      HoodieTimeline commitTimeline = 
metaClient.getCommitTimeline().filterCompletedInstants();
+      if (commitTimeline.empty()) {
+        // No compaction yet so no need to check for deltacommits due to the 
logic above
+        return true;
+      }
+
+      if (HoodieTimeline.compareTimestamps(instantTime, GREATER_THAN, 
commitTimeline.lastInstant().get().getTimestamp())) {
+        // After the last compaction so no rollback required as per logic above
+        return true;
+      }
+      LOG.warn("Cannot rollback instant " + instantTime + " because the 
corresponding deltacommit has been compacted "
+          + " in " + commitTimeline.lastInstant().get().getTimestamp());
+      return false;
+    } catch (TableNotFoundException e) {
+      // Metadata table is not setup.  Nothing to rollback.  Exit gracefully.
+      LOG.warn("Cannot rollback instant " + instantTime + " as metadata table 
is not found");
+      return true;
+    }
+  }
+
+  @Override
+  public boolean isGlobal() {
+    return true;
+  }
+
+  @Override
+  public boolean canIndexLogFiles() {
+    return true;
+  }
+
+  @Override
+  public boolean isImplicitWithStorage() {
+    return false;
+  }
+
+  private <R> HoodieData<HoodieRecord<R>> tagLocationBackToRecords(
+      HoodiePairData<String, HoodieRecordGlobalLocation> keyFilenamePair,
+      HoodieData<HoodieRecord<R>> records) {
+    HoodiePairData<String, HoodieRecord<R>> keyRecordPairs =
+        records.mapToPair(record -> new ImmutablePair<>(record.getRecordKey(), 
record));
+    // Here as the records might have more data than keyFilenamePairs (some 
row keys' not found in record index),
+    // we will do left outer join.
+    return keyRecordPairs.leftOuterJoin(keyFilenamePair).values()
+        .map(v -> {
+          HoodieRecord<R> record = v.getLeft();
+          Option<HoodieRecordGlobalLocation> location = 
Option.ofNullable(v.getRight().orElse(null));
+          if (!location.isPresent()) {
+            // No location found.
+            return record;
+          }
+          // Ensure the partitionPath is also set correctly in the key
+          if 
(!record.getPartitionPath().equals(location.get().getPartitionPath())) {
+            record = new HoodieAvroRecord(new HoodieKey(record.getRecordKey(), 
location.get().getPartitionPath()), (HoodieRecordPayload) record.getData());
+          }
+
+          // Perform the tagging. Not using HoodieIndexUtils.getTaggedRecord 
to prevent an additional copy which is not necessary for this index.
+          record.unseal();
+          record.setCurrentLocation(location.get());
+          record.seal();
+          return record;
+        });
+  }
+
+  /**
+   * Function that lookups a list of keys in a single shard of the record index
+   */
+  private static class RecordIndexFileGroupLookupFunction implements 
PairFlatMapFunction<Iterator<String>, String, HoodieRecordGlobalLocation> {
+    private final HoodieTable hoodieTable;
+
+    public RecordIndexFileGroupLookupFunction(HoodieTable hoodieTable) {
+      this.hoodieTable = hoodieTable;
+    }
+
+    @Override
+    public Iterator<Tuple2<String, HoodieRecordGlobalLocation>> 
call(Iterator<String> recordKeyIterator) {
+      List<String> keysToLookup = new ArrayList<>();
+      recordKeyIterator.forEachRemaining(keysToLookup::add);
+
+      // recordIndexInfo object only contains records that are present in 
record_index.
+      Map<String, HoodieRecordGlobalLocation> recordIndexInfo = 
hoodieTable.getMetadataTable().readRecordIndex(keysToLookup);
+
+      HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
+      HoodieTimeline commitsTimeline = 
metaClient.getCommitsTimeline().filterCompletedInstants();
+      return recordIndexInfo.entrySet().stream()
+          .filter(e -> HoodieIndexUtils.checkIfValidCommit(commitsTimeline, 
e.getValue().getInstantTime()))

Review Comment:
   may I know why do we need this. We already ensure we ignore non committed 
records while reading log records based on valid instant ranges. So, interested 
to know what are we trying to guard in addition here. 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -494,14 +597,8 @@ private Pair<Integer, HoodieData<HoodieRecord>> 
initializeFilesPartition(String
     engineContext.setJobStatus(this.getClass().getSimpleName(), "Creating 
records for MDT FILES partition");
     HoodieData<HoodieRecord> fileListRecords = 
engineContext.parallelize(partitionInfoList, 
partitionInfoList.size()).map(partitionInfo -> {
       Map<String, Long> fileNameToSizeMap = 
partitionInfo.getFileNameToSizeMap();
-      // filter for files that are part of the completed commits
-      Map<String, Long> validFileNameToSizeMap = 
fileNameToSizeMap.entrySet().stream().filter(fileSizePair -> {

Review Comment:
   I see. we have moved it to L1309 ish



##########
hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java:
##########
@@ -143,46 +126,33 @@ public List<String> getAllPartitionPaths() throws 
IOException {
    * @param partitionPath The absolute path of the partition to list
    */
   @Override
-  public FileStatus[] getAllFilesInPartition(Path partitionPath)
-      throws IOException {
-    if (isMetadataTableEnabled) {
-      try {
-        return fetchAllFilesInPartition(partitionPath);
-      } catch (Exception e) {
-        throw new HoodieMetadataException("Failed to retrieve files in 
partition " + partitionPath + " from metadata", e);
-      }
+  public FileStatus[] getAllFilesInPartition(Path partitionPath) throws 
IOException {
+    ValidationUtils.checkArgument(isMetadataTableInitialized);
+    try {
+      return fetchAllFilesInPartition(partitionPath);
+    } catch (Exception e) {
+      throw new HoodieMetadataException("Failed to retrieve files in partition 
" + partitionPath + " from metadata", e);
     }
-
-    FileSystemBackedTableMetadata fileSystemBackedTableMetadata =

Review Comment:
   for eg, for the first time when metadata is enabled on the wirte config, but 
mdt table is yet to be initialized, the FS view used might be instantiating 
HoodieBackedTableMetadata. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to