nsivabalan commented on a change in pull request #4352:
URL: https://github.com/apache/hudi/pull/4352#discussion_r786179763



##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
##########
@@ -111,13 +116,19 @@ public HoodieBloomIndex(HoodieWriteConfig config, 
BaseHoodieBloomIndexHelper blo
   private HoodiePairData<HoodieKey, HoodieRecordLocation> lookupIndex(
       HoodiePairData<String, String> partitionRecordKeyPairs, final 
HoodieEngineContext context,
       final HoodieTable hoodieTable) {
-    // Obtain records per partition, in the incoming records
+    // Step 1: Obtain records per partition, in the incoming records
     Map<String, Long> recordsPerPartition = 
partitionRecordKeyPairs.countByKey();
     List<String> affectedPartitionPathList = new 
ArrayList<>(recordsPerPartition.keySet());
 
     // Step 2: Load all involved files as <Partition, filename> pairs
-    List<Pair<String, BloomIndexFileInfo>> fileInfoList =
-        loadInvolvedFiles(affectedPartitionPathList, context, hoodieTable);
+    List<Pair<String, BloomIndexFileInfo>> fileInfoList;
+    if (config.getBloomIndexPruneByRanges()) {
+      fileInfoList = 
(config.getMetadataConfig().isMetaIndexColumnStatsEnabled()
+          ? loadColumnStats(affectedPartitionPathList, context, hoodieTable)

Review comment:
       can we rename loadColumnStats to loadInvolvedFilesFromMetadata or 
something. 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
##########
@@ -140,23 +151,83 @@ public HoodieBloomIndex(HoodieWriteConfig config, 
BaseHoodieBloomIndexHelper blo
         .map(pair -> Pair.of(pair.getKey(), pair.getValue().getFileId()))
         .collect(toList());
 
-    if (config.getBloomIndexPruneByRanges()) {
-      // also obtain file ranges, if range pruning is enabled
-      context.setJobStatus(this.getClass().getName(), "Obtain key ranges for 
file slices (range pruning=on)");
-      return context.map(partitionPathFileIDList, pf -> {
-        try {
-          HoodieRangeInfoHandle rangeInfoHandle = new 
HoodieRangeInfoHandle(config, hoodieTable, pf);
-          String[] minMaxKeys = rangeInfoHandle.getMinMaxKeys();
-          return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue(), 
minMaxKeys[0], minMaxKeys[1]));
-        } catch (MetadataNotFoundException me) {
-          LOG.warn("Unable to find range metadata in file :" + pf);
-          return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue()));
+    context.setJobStatus(this.getClass().getName(), "Obtain key ranges for 
file slices (range pruning=on)");
+    return context.map(partitionPathFileIDList, pf -> {
+      try {
+        HoodieRangeInfoHandle rangeInfoHandle = new 
HoodieRangeInfoHandle(config, hoodieTable, pf);
+        String[] minMaxKeys = rangeInfoHandle.getMinMaxKeys();
+        return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue(), 
minMaxKeys[0], minMaxKeys[1]));
+      } catch (MetadataNotFoundException me) {
+        LOG.warn("Unable to find range metadata in file :" + pf);
+        return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue()));
+      }
+    }, Math.max(partitionPathFileIDList.size(), 1));
+  }
+
+  /**
+   * Get the latest base files for the requested partitions.
+   *
+   * @param partitions  - List of partitions to get the base files for
+   * @param context     - Engine context
+   * @param hoodieTable - Hoodie Table
+   * @return List of partition and file column range info pairs
+   */
+  List<Pair<String, BloomIndexFileInfo>> getLatestBaseFilesForPartitions(
+      List<String> partitions, final HoodieEngineContext context, final 
HoodieTable hoodieTable) {
+    List<Pair<String, String>> partitionPathFileIDList = 
getLatestBaseFilesForAllPartitions(partitions, context,
+        hoodieTable).stream()
+        .map(pair -> Pair.of(pair.getKey(), pair.getValue().getFileId()))
+        .collect(toList());
+    return partitionPathFileIDList.stream()
+        .map(pf -> Pair.of(pf.getKey(), new 
BloomIndexFileInfo(pf.getValue()))).collect(toList());
+  }
+
+  /**
+   * Load the column stats index as BloomIndexFileInfo for all the involved 
files in the partition.
+   *
+   * @param partitions  - List of partitions for which column stats need to be 
loaded
+   * @param context     - Engine context
+   * @param hoodieTable - Hoodie table
+   * @return List of partition and file column range info pairs
+   */
+  List<Pair<String, BloomIndexFileInfo>> loadColumnStats(
+      List<String> partitions, final HoodieEngineContext context, final 
HoodieTable hoodieTable) {
+    // also obtain file ranges, if range pruning is enabled
+    context.setJobStatus(this.getClass().getName(), "Load meta index key 
ranges for file slices");
+
+    final String keyField = 
hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp();
+    return context.flatMap(partitions, partitionName -> {
+      List<Pair<String, String>> partitionFileIdList =

Review comment:
       fileIdFileNameList

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomMetaIndexBatchCheckFunction.java
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bloom;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.bloom.BloomFilterTypeCode;
+import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.index.HoodieIndexUtils;
+import org.apache.hudi.io.HoodieKeyLookupResult;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.function.Function2;
+import scala.Tuple2;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Function performing actual checking of RDD partition containing (fileId, 
hoodieKeys) against the actual files.
+ */
+public class HoodieBloomMetaIndexBatchCheckFunction implements
+    Function2<Integer, Iterator<Tuple2<String, HoodieKey>>, 
Iterator<List<HoodieKeyLookupResult>>> {
+
+  private static final Logger LOG = 
LogManager.getLogger(HoodieBloomMetaIndexBatchCheckFunction.class);
+  private final HoodieTable hoodieTable;
+  private final HoodieWriteConfig config;
+
+  public HoodieBloomMetaIndexBatchCheckFunction(HoodieTable hoodieTable, 
HoodieWriteConfig config) {
+    this.hoodieTable = hoodieTable;
+    this.config = config;
+  }
+
+  @Override
+  public Iterator<List<HoodieKeyLookupResult>> call(Integer integer, 
Iterator<Tuple2<String, HoodieKey>> tuple2Iterator) throws Exception {
+    List<List<HoodieKeyLookupResult>> resultList = new ArrayList<>();
+    Map<Pair<String, String>, List<HoodieKey>> fileToKeysMap = new HashMap<>();
+
+    final Map<String, HoodieBaseFile> fileIDBaseFileMap = new HashMap<>();
+    while (tuple2Iterator.hasNext()) {
+      Tuple2<String, HoodieKey> entry = tuple2Iterator.next();
+      final String partitionPath = entry._2.getPartitionPath();
+      final String fileId = entry._1;
+      if (!fileIDBaseFileMap.containsKey(fileId)) {

Review comment:
       wondering why do we need fileIDBaseFileMap. for a given task, we may not 
get repeated files right? so, why not directly go from fileId to 
getLatestBaseFile and to go fileName. am I missing something here. 

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomMetaIndexBatchCheckFunction.java
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bloom;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.bloom.BloomFilterTypeCode;
+import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.index.HoodieIndexUtils;
+import org.apache.hudi.io.HoodieKeyLookupResult;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.function.Function2;
+import scala.Tuple2;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Function performing actual checking of RDD partition containing (fileId, 
hoodieKeys) against the actual files.
+ */
+public class HoodieBloomMetaIndexBatchCheckFunction implements
+    Function2<Integer, Iterator<Tuple2<String, HoodieKey>>, 
Iterator<List<HoodieKeyLookupResult>>> {
+
+  private static final Logger LOG = 
LogManager.getLogger(HoodieBloomMetaIndexBatchCheckFunction.class);
+  private final HoodieTable hoodieTable;
+  private final HoodieWriteConfig config;
+
+  public HoodieBloomMetaIndexBatchCheckFunction(HoodieTable hoodieTable, 
HoodieWriteConfig config) {
+    this.hoodieTable = hoodieTable;
+    this.config = config;
+  }
+
+  @Override
+  public Iterator<List<HoodieKeyLookupResult>> call(Integer integer, 
Iterator<Tuple2<String, HoodieKey>> tuple2Iterator) throws Exception {
+    List<List<HoodieKeyLookupResult>> resultList = new ArrayList<>();
+    Map<Pair<String, String>, List<HoodieKey>> fileToKeysMap = new HashMap<>();
+
+    final Map<String, HoodieBaseFile> fileIDBaseFileMap = new HashMap<>();
+    while (tuple2Iterator.hasNext()) {
+      Tuple2<String, HoodieKey> entry = tuple2Iterator.next();
+      final String partitionPath = entry._2.getPartitionPath();
+      final String fileId = entry._1;
+      if (!fileIDBaseFileMap.containsKey(fileId)) {
+        Option<HoodieBaseFile> baseFile = 
hoodieTable.getBaseFileOnlyView().getLatestBaseFile(partitionPath, fileId);
+        if (!baseFile.isPresent()) {
+          throw new HoodieIndexException("Failed to find the base file for 
partition: " + partitionPath
+              + ", fileId: " + fileId);
+        }
+        fileIDBaseFileMap.put(fileId, baseFile.get());
+      }
+      fileToKeysMap.computeIfAbsent(Pair.of(partitionPath, 
fileIDBaseFileMap.get(fileId).getFileName()),
+          k -> new ArrayList<>()).add(entry._2);
+    }
+    if (fileToKeysMap.isEmpty()) {
+      return Collections.emptyListIterator();
+    }
+
+    List<Pair<String, String>> partitionNameFileNameList =
+        fileToKeysMap.keySet().stream().map(partitionNameFileNamePair -> {
+          return Pair.of(partitionNameFileNamePair.getLeft(), 
partitionNameFileNamePair.getRight());
+        }).collect(Collectors.toList());
+
+    Map<Pair<String, String>, ByteBuffer> fileIDToBloomFilterByteBufferMap =
+        
hoodieTable.getMetadataTable().getBloomFilters(partitionNameFileNameList);
+
+    fileToKeysMap.forEach((partitionPathFileIdPair, hoodieKeyList) -> {
+      final String partitionPath = partitionPathFileIdPair.getLeft();
+      final String fileName = partitionPathFileIdPair.getRight();
+      final Pair<String, String> partitionFileNamePair = 
Pair.of(partitionPath, fileName);

Review comment:
       why do we need this variable explicitly. we can try to use 
partitionPathFileIdPair as is.

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java
##########
@@ -70,26 +69,44 @@ public static SparkHoodieBloomIndexHelper getInstance() {
     JavaRDD<Tuple2<String, HoodieKey>> fileComparisonsRDD =
         HoodieJavaRDD.getJavaRDD(fileComparisonPairs)
             .map(pair -> new Tuple2<>(pair.getLeft(), pair.getRight()));
-    Map<String, Long> comparisonsPerFileGroup = computeComparisonsPerFileGroup(
-        config, recordsPerPartition, partitionToFileInfo, fileComparisonsRDD, 
context);
-    int inputParallelism =
-        
HoodieJavaPairRDD.getJavaPairRDD(partitionRecordKeyPairs).partitions().size();
+
+    int inputParallelism = 
HoodieJavaPairRDD.getJavaPairRDD(partitionRecordKeyPairs).partitions().size();
     int joinParallelism = Math.max(inputParallelism, 
config.getBloomIndexParallelism());
     LOG.info("InputParallelism: ${" + inputParallelism + "}, IndexParallelism: 
${"
         + config.getBloomIndexParallelism() + "}");
 
-    if (config.useBloomIndexBucketizedChecking()) {
+    JavaRDD<List<HoodieKeyLookupResult>> keyLookupResultRDD;
+    if (config.isMetaIndexBloomFilterEnabled()) {
+      // Step 1: Sort by file id
+      JavaRDD<Tuple2<String, HoodieKey>> sortedFileIdAndKeyPairs =
+          fileComparisonsRDD.sortBy(entry -> entry._1, true, joinParallelism);

Review comment:
       minor. can also do Tuple2::_1 instead of entry -> entry._1

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
##########
@@ -171,17 +172,37 @@ protected void commit(HoodieData<HoodieRecord> 
hoodieDataRecords, String partiti
 
   /**
    * Tag each record with the location in the given partition.
-   *
+   * <p>
    * The record is tagged with respective file slice's location based on its 
record key.
    */
-  private JavaRDD<HoodieRecord> prepRecords(JavaRDD<HoodieRecord> recordsRDD, 
String partitionName, int numFileGroups) {
-    List<FileSlice> fileSlices = 
HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient, 
partitionName);
-    ValidationUtils.checkArgument(fileSlices.size() == numFileGroups, 
String.format("Invalid number of file groups: found=%d, required=%d", 
fileSlices.size(), numFileGroups));
-
-    return recordsRDD.map(r -> {
-      FileSlice slice = 
fileSlices.get(HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(r.getRecordKey(),
 numFileGroups));
-      r.setCurrentLocation(new 
HoodieRecordLocation(slice.getBaseInstantTime(), slice.getFileId()));
-      return r;
-    });
+  private JavaRDD<HoodieRecord> prepRecords(Map<MetadataPartitionType, 
HoodieData<HoodieRecord>> partitionRecordsMap) {
+    // The result set
+    JavaRDD<HoodieRecord> rddAllPartitionRecords = null;
+
+    for (Map.Entry<MetadataPartitionType, HoodieData<HoodieRecord>> entry : 
partitionRecordsMap.entrySet()) {
+      final String partitionName = entry.getKey().partitionPath();
+      final int fileGroupCount = entry.getKey().getFileGroupCount();
+      HoodieData<HoodieRecord> records = entry.getValue();
+      JavaRDD<HoodieRecord> recordsRDD = (JavaRDD<HoodieRecord>) records.get();
+
+      List<FileSlice> fileSlices =
+          
HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient, 
partitionName);
+      ValidationUtils.checkArgument(fileSlices.size() == fileGroupCount,
+          String.format("Invalid number of file groups: found=%d, 
required=%d", fileSlices.size(), fileGroupCount));
+
+      JavaRDD<HoodieRecord> rddSinglePartitionRecords = recordsRDD.map(r -> {

Review comment:
       Probably its intentinal, just wanted to confirm. We have a map of 
partition type to list of records to be prepped. So, we have a choice to 
parallelize across diff partition types and go sequential for all records 
within a partition or go sequential for diff partition types and parallelize 
across diff records within each partition. We are going w/ latter. 
   L182 to L198

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
##########
@@ -125,30 +129,43 @@ private void initIfNeeded() {
     return recordsByKeys.size() == 0 ? Option.empty() : 
recordsByKeys.get(0).getValue();
   }
 
-  protected List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> 
getRecordsByKeys(List<String> keys, String partitionName) {
-    Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> readers = 
openReadersIfNeeded(keys.get(0), partitionName);
-    try {
-      List<Long> timings = new ArrayList<>();
-      HoodieFileReader baseFileReader = readers.getKey();
-      HoodieMetadataMergedLogRecordReader logRecordScanner = 
readers.getRight();
+  @Override
+  protected List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> 
getRecordsByKeys(List<String> keys,
+                                                                               
              String partitionName) {
+    Map<Pair<String, FileSlice>, List<String>> partitionFileSliceToKeysMap = 
getPartitionFileSlices(partitionName, keys);
+    List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> result = 
new ArrayList<>();
+    AtomicInteger fileSlicesKeysCount = new AtomicInteger();
+    partitionFileSliceToKeysMap.forEach((partitionFileSlicePair, 
fileSliceKeys) -> {

Review comment:
       based on offline sync up, in this patch we have only one filegroup for 
every partition in metadata table. This needs to be revisited when we fix this 
in a follow up patch where we add more file groups for col stats and bloom 
index partition.

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
##########
@@ -140,23 +151,83 @@ public HoodieBloomIndex(HoodieWriteConfig config, 
BaseHoodieBloomIndexHelper blo
         .map(pair -> Pair.of(pair.getKey(), pair.getValue().getFileId()))
         .collect(toList());
 
-    if (config.getBloomIndexPruneByRanges()) {
-      // also obtain file ranges, if range pruning is enabled
-      context.setJobStatus(this.getClass().getName(), "Obtain key ranges for 
file slices (range pruning=on)");
-      return context.map(partitionPathFileIDList, pf -> {
-        try {
-          HoodieRangeInfoHandle rangeInfoHandle = new 
HoodieRangeInfoHandle(config, hoodieTable, pf);
-          String[] minMaxKeys = rangeInfoHandle.getMinMaxKeys();
-          return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue(), 
minMaxKeys[0], minMaxKeys[1]));
-        } catch (MetadataNotFoundException me) {
-          LOG.warn("Unable to find range metadata in file :" + pf);
-          return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue()));
+    context.setJobStatus(this.getClass().getName(), "Obtain key ranges for 
file slices (range pruning=on)");
+    return context.map(partitionPathFileIDList, pf -> {
+      try {
+        HoodieRangeInfoHandle rangeInfoHandle = new 
HoodieRangeInfoHandle(config, hoodieTable, pf);
+        String[] minMaxKeys = rangeInfoHandle.getMinMaxKeys();
+        return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue(), 
minMaxKeys[0], minMaxKeys[1]));
+      } catch (MetadataNotFoundException me) {
+        LOG.warn("Unable to find range metadata in file :" + pf);
+        return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue()));
+      }
+    }, Math.max(partitionPathFileIDList.size(), 1));
+  }
+
+  /**
+   * Get the latest base files for the requested partitions.
+   *
+   * @param partitions  - List of partitions to get the base files for
+   * @param context     - Engine context
+   * @param hoodieTable - Hoodie Table
+   * @return List of partition and file column range info pairs
+   */
+  List<Pair<String, BloomIndexFileInfo>> getLatestBaseFilesForPartitions(
+      List<String> partitions, final HoodieEngineContext context, final 
HoodieTable hoodieTable) {
+    List<Pair<String, String>> partitionPathFileIDList = 
getLatestBaseFilesForAllPartitions(partitions, context,
+        hoodieTable).stream()
+        .map(pair -> Pair.of(pair.getKey(), pair.getValue().getFileId()))
+        .collect(toList());
+    return partitionPathFileIDList.stream()
+        .map(pf -> Pair.of(pf.getKey(), new 
BloomIndexFileInfo(pf.getValue()))).collect(toList());
+  }
+
+  /**
+   * Load the column stats index as BloomIndexFileInfo for all the involved 
files in the partition.
+   *
+   * @param partitions  - List of partitions for which column stats need to be 
loaded
+   * @param context     - Engine context
+   * @param hoodieTable - Hoodie table
+   * @return List of partition and file column range info pairs
+   */
+  List<Pair<String, BloomIndexFileInfo>> loadColumnStats(
+      List<String> partitions, final HoodieEngineContext context, final 
HoodieTable hoodieTable) {
+    // also obtain file ranges, if range pruning is enabled
+    context.setJobStatus(this.getClass().getName(), "Load meta index key 
ranges for file slices");
+
+    final String keyField = 
hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp();
+    return context.flatMap(partitions, partitionName -> {
+      List<Pair<String, String>> partitionFileIdList =

Review comment:
       btw, do we even need fileId. I don't see it being used anywhere. Can we 
just fetch fileName only. 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
##########
@@ -101,4 +116,34 @@ public static HoodieRecord getTaggedRecord(HoodieRecord 
inputRecord, Option<Hood
     }
     return record;
   }
+
+  /**
+   * Given a list of row keys and one file, return only row keys existing in 
that file.
+   *
+   * @param filePath            - File to filter keys from
+   * @param candidateRecordKeys - Candidate keys to filter
+   * @return List of candidate keys that are available in the file
+   */
+  public static List<String> filterKeysFromFile(Path filePath, List<String> 
candidateRecordKeys,

Review comment:
       do we have UTs for this.

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLookupHandle.java
##########
@@ -46,52 +50,57 @@
 
   private static final Logger LOG = 
LogManager.getLogger(HoodieKeyLookupHandle.class);
 
-  private final HoodieTableType tableType;
-
   private final BloomFilter bloomFilter;
-
   private final List<String> candidateRecordKeys;
-
+  private final Option<String> fileName;
+  private final boolean useMetadataTableIndex;
   private long totalKeysChecked;
 
   public HoodieKeyLookupHandle(HoodieWriteConfig config, HoodieTable<T, I, K, 
O> hoodieTable,
-                               Pair<String, String> partitionPathFilePair) {
-    super(config, null, hoodieTable, partitionPathFilePair);
-    this.tableType = hoodieTable.getMetaClient().getTableType();
+                               Pair<String, String> partitionPathFileIDPair) {
+    this(config, hoodieTable, partitionPathFileIDPair, Option.empty(), false);
+  }
+
+  public HoodieKeyLookupHandle(HoodieWriteConfig config, HoodieTable<T, I, K, 
O> hoodieTable,
+                               Pair<String, String> partitionPathFileIDPair, 
Option<String> fileName,
+                               boolean useMetadataTableIndex) {
+    super(config, null, hoodieTable, partitionPathFileIDPair);
     this.candidateRecordKeys = new ArrayList<>();
     this.totalKeysChecked = 0;
-    HoodieTimer timer = new HoodieTimer().startTimer();
-
-    try {
-      this.bloomFilter = createNewFileReader().readBloomFilter();
-    } catch (IOException e) {
-      throw new HoodieIndexException(String.format("Error reading bloom filter 
from %s: %s", partitionPathFilePair, e));
+    if (fileName.isPresent()) {
+      
ValidationUtils.checkArgument(FSUtils.getFileId(fileName.get()).equals(getFileId()));
+      this.fileName = fileName;
+    } else {
+      this.fileName = Option.empty();
     }
-    LOG.info(String.format("Read bloom filter from %s in %d ms", 
partitionPathFilePair, timer.endTimer()));
+    this.useMetadataTableIndex = useMetadataTableIndex;
+    this.bloomFilter = getBloomFilter();
+

Review comment:
       remove extra line break

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomMetaIndexBatchCheckFunction.java
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bloom;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.bloom.BloomFilterTypeCode;
+import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.index.HoodieIndexUtils;
+import org.apache.hudi.io.HoodieKeyLookupResult;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.function.Function2;
+import scala.Tuple2;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Function performing actual checking of RDD partition containing (fileId, 
hoodieKeys) against the actual files.
+ */
+public class HoodieBloomMetaIndexBatchCheckFunction implements
+    Function2<Integer, Iterator<Tuple2<String, HoodieKey>>, 
Iterator<List<HoodieKeyLookupResult>>> {
+
+  private static final Logger LOG = 
LogManager.getLogger(HoodieBloomMetaIndexBatchCheckFunction.class);
+  private final HoodieTable hoodieTable;
+  private final HoodieWriteConfig config;
+
+  public HoodieBloomMetaIndexBatchCheckFunction(HoodieTable hoodieTable, 
HoodieWriteConfig config) {
+    this.hoodieTable = hoodieTable;
+    this.config = config;
+  }
+
+  @Override
+  public Iterator<List<HoodieKeyLookupResult>> call(Integer integer, 
Iterator<Tuple2<String, HoodieKey>> tuple2Iterator) throws Exception {
+    List<List<HoodieKeyLookupResult>> resultList = new ArrayList<>();
+    Map<Pair<String, String>, List<HoodieKey>> fileToKeysMap = new HashMap<>();

Review comment:
       can we add a comment here that Pair represents partition Path and 
fileName

##########
File path: 
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
##########
@@ -150,18 +152,30 @@ protected void commit(HoodieData<HoodieRecord> 
hoodieDataRecords, String partiti
 
   /**
    * Tag each record with the location in the given partition.
-   *
+   * <p>
    * The record is tagged with respective file slice's location based on its 
record key.
    */
-  private List<HoodieRecord> prepRecords(List<HoodieRecord> records, String 
partitionName, int numFileGroups) {
-    List<FileSlice> fileSlices = 
HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient, 
partitionName);
-    ValidationUtils.checkArgument(fileSlices.size() == numFileGroups, 
String.format("Invalid number of file groups: found=%d, required=%d", 
fileSlices.size(), numFileGroups));
-
-    return records.stream().map(r -> {
-      FileSlice slice = 
fileSlices.get(HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(r.getRecordKey(),
 numFileGroups));
-      final String instantTime = slice.isEmpty() ? "I" : "U";
-      r.setCurrentLocation(new HoodieRecordLocation(instantTime, 
slice.getFileId()));
-      return r;
-    }).collect(Collectors.toList());
+  private List<HoodieRecord> prepRecords(Map<MetadataPartitionType, 
HoodieData<HoodieRecord>> partitionRecordsMap) {
+    List<HoodieRecord> allPartitionRecords = null;

Review comment:
       Can you create a follow up jira to unify this code across flink and 
spark using HoodieData if possible. 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLookupHandle.java
##########
@@ -46,52 +50,57 @@
 
   private static final Logger LOG = 
LogManager.getLogger(HoodieKeyLookupHandle.class);
 
-  private final HoodieTableType tableType;
-
   private final BloomFilter bloomFilter;
-
   private final List<String> candidateRecordKeys;
-
+  private final Option<String> fileName;
+  private final boolean useMetadataTableIndex;
   private long totalKeysChecked;
 
   public HoodieKeyLookupHandle(HoodieWriteConfig config, HoodieTable<T, I, K, 
O> hoodieTable,
-                               Pair<String, String> partitionPathFilePair) {
-    super(config, null, hoodieTable, partitionPathFilePair);
-    this.tableType = hoodieTable.getMetaClient().getTableType();
+                               Pair<String, String> partitionPathFileIDPair) {
+    this(config, hoodieTable, partitionPathFileIDPair, Option.empty(), false);
+  }
+
+  public HoodieKeyLookupHandle(HoodieWriteConfig config, HoodieTable<T, I, K, 
O> hoodieTable,
+                               Pair<String, String> partitionPathFileIDPair, 
Option<String> fileName,
+                               boolean useMetadataTableIndex) {
+    super(config, null, hoodieTable, partitionPathFileIDPair);
     this.candidateRecordKeys = new ArrayList<>();
     this.totalKeysChecked = 0;
-    HoodieTimer timer = new HoodieTimer().startTimer();
-
-    try {
-      this.bloomFilter = createNewFileReader().readBloomFilter();
-    } catch (IOException e) {
-      throw new HoodieIndexException(String.format("Error reading bloom filter 
from %s: %s", partitionPathFilePair, e));
+    if (fileName.isPresent()) {
+      
ValidationUtils.checkArgument(FSUtils.getFileId(fileName.get()).equals(getFileId()));
+      this.fileName = fileName;
+    } else {
+      this.fileName = Option.empty();

Review comment:
       we can assign Option.empty at L55 and don't need this else block at all

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomMetaIndexBatchCheckFunction.java
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bloom;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.bloom.BloomFilterTypeCode;
+import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.index.HoodieIndexUtils;
+import org.apache.hudi.io.HoodieKeyLookupResult;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.function.Function2;
+import scala.Tuple2;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Function performing actual checking of RDD partition containing (fileId, 
hoodieKeys) against the actual files.
+ */
+public class HoodieBloomMetaIndexBatchCheckFunction implements
+    Function2<Integer, Iterator<Tuple2<String, HoodieKey>>, 
Iterator<List<HoodieKeyLookupResult>>> {
+
+  private static final Logger LOG = 
LogManager.getLogger(HoodieBloomMetaIndexBatchCheckFunction.class);
+  private final HoodieTable hoodieTable;
+  private final HoodieWriteConfig config;
+
+  public HoodieBloomMetaIndexBatchCheckFunction(HoodieTable hoodieTable, 
HoodieWriteConfig config) {
+    this.hoodieTable = hoodieTable;
+    this.config = config;
+  }
+
+  @Override
+  public Iterator<List<HoodieKeyLookupResult>> call(Integer integer, 
Iterator<Tuple2<String, HoodieKey>> tuple2Iterator) throws Exception {
+    List<List<HoodieKeyLookupResult>> resultList = new ArrayList<>();
+    Map<Pair<String, String>, List<HoodieKey>> fileToKeysMap = new HashMap<>();
+
+    final Map<String, HoodieBaseFile> fileIDBaseFileMap = new HashMap<>();
+    while (tuple2Iterator.hasNext()) {
+      Tuple2<String, HoodieKey> entry = tuple2Iterator.next();
+      final String partitionPath = entry._2.getPartitionPath();
+      final String fileId = entry._1;
+      if (!fileIDBaseFileMap.containsKey(fileId)) {
+        Option<HoodieBaseFile> baseFile = 
hoodieTable.getBaseFileOnlyView().getLatestBaseFile(partitionPath, fileId);
+        if (!baseFile.isPresent()) {
+          throw new HoodieIndexException("Failed to find the base file for 
partition: " + partitionPath
+              + ", fileId: " + fileId);
+        }
+        fileIDBaseFileMap.put(fileId, baseFile.get());
+      }
+      fileToKeysMap.computeIfAbsent(Pair.of(partitionPath, 
fileIDBaseFileMap.get(fileId).getFileName()),
+          k -> new ArrayList<>()).add(entry._2);
+    }
+    if (fileToKeysMap.isEmpty()) {
+      return Collections.emptyListIterator();
+    }
+
+    List<Pair<String, String>> partitionNameFileNameList =

Review comment:
       can we simplify as below
   ```
   List<Pair<String, String>> partitionNameFileNameList =  
fileToKeysMap.keySet().stream().collect(Collectors.toList())
   ```

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -319,9 +635,90 @@ private static void 
processRollbackMetadata(HoodieActiveTimeline metadataTableTi
     return records;
   }
 
+  /**
+   * Convert rollback action metadata to bloom filter index records.
+   */
+  private static List<HoodieRecord> 
convertFilesToBloomFilterRecords(HoodieEngineContext engineContext,
+                                                                     
HoodieTableMetaClient dataMetaClient,
+                                                                     
Map<String, List<String>> partitionToDeletedFiles,
+                                                                     
Map<String, Map<String, Long>> partitionToAppendedFiles,
+                                                                     String 
instantTime) {
+    List<HoodieRecord> records = new LinkedList<>();
+    partitionToDeletedFiles.forEach((partitionName, deletedFileList) -> 
deletedFileList.forEach(deletedFile -> {
+      if (!FSUtils.isBaseFile(new Path(deletedFile))) {
+        return;
+      }
+
+      final String partition = partitionName.equals(EMPTY_PARTITION_NAME) ? 
NON_PARTITIONED_NAME : partitionName;
+      records.add(HoodieMetadataPayload.createBloomFilterMetadataRecord(
+          new PartitionIndexID(partition), new FileIndexID(deletedFile),
+          instantTime, ByteBuffer.allocate(0), true));
+    }));
+
+    partitionToAppendedFiles.forEach((partitionName, appendedFileMap) -> {
+      final String partition = partitionName.equals(EMPTY_PARTITION_NAME) ? 
NON_PARTITIONED_NAME : partitionName;
+      appendedFileMap.forEach((appendedFile, length) -> {
+        if (!FSUtils.isBaseFile(new Path(appendedFile))) {
+          return;
+        }
+        final String pathWithPartition = partitionName + "/" + appendedFile;
+        final Path appendedFilePath = new Path(dataMetaClient.getBasePath(), 
pathWithPartition);
+        try {
+          HoodieFileReader<IndexedRecord> fileReader =
+              
HoodieFileReaderFactory.getFileReader(dataMetaClient.getHadoopConf(), 
appendedFilePath);
+          final BloomFilter fileBloomFilter = fileReader.readBloomFilter();
+          if (fileBloomFilter == null) {
+            LOG.error("Failed to read bloom filter for " + appendedFilePath);
+            return;
+          }
+          ByteBuffer bloomByteBuffer = 
ByteBuffer.wrap(fileBloomFilter.serializeToString().getBytes());
+          HoodieRecord record = 
HoodieMetadataPayload.createBloomFilterMetadataRecord(
+              new PartitionIndexID(partition), new FileIndexID(appendedFile), 
instantTime,
+              bloomByteBuffer, false);
+          records.add(record);
+          fileReader.close();
+        } catch (IOException e) {
+          LOG.error("Failed to get bloom filter for file: " + 
appendedFilePath);
+        }
+      });
+    });
+    return records;
+  }
+
+  /**
+   * Convert rollback action metadata to column stats index records.
+   */
+  private static List<HoodieRecord> 
convertFilesToColumnStatsRecords(HoodieEngineContext engineContext,
+                                                                     
HoodieTableMetaClient datasetMetaClient,
+                                                                     
Map<String, List<String>> partitionToDeletedFiles,
+                                                                     
Map<String, Map<String, Long>> partitionToAppendedFiles,
+                                                                     String 
instantTime) {
+    List<HoodieRecord> records = new LinkedList<>();
+    List<String> latestColumns = getLatestColumns(datasetMetaClient);
+    partitionToDeletedFiles.forEach((partitionName, deletedFileList) -> 
deletedFileList.forEach(deletedFile -> {
+      final String partition = partitionName.equals(EMPTY_PARTITION_NAME) ? 
NON_PARTITIONED_NAME : partitionName;
+      if (deletedFile.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
+        final String filePathWithPartition = partitionName + "/" + deletedFile;
+        records.addAll(getColumnStats(partition, filePathWithPartition, 
datasetMetaClient,

Review comment:
       update: getColumnStats handles how to go about deleted file. we don't 
read the file for deleted entry.

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
##########
@@ -109,55 +191,97 @@ private HoodieMetadataPayload(String key, int type, 
Map<String, HoodieMetadataFi
    */
   public static HoodieRecord<HoodieMetadataPayload> 
createPartitionListRecord(List<String> partitions) {
     Map<String, HoodieMetadataFileInfo> fileInfo = new HashMap<>();
-    partitions.forEach(partition -> fileInfo.put(partition, new 
HoodieMetadataFileInfo(0L,  false)));
+    partitions.forEach(partition -> fileInfo.put(partition, new 
HoodieMetadataFileInfo(0L, false)));
 
     HoodieKey key = new HoodieKey(RECORDKEY_PARTITION_LIST, 
MetadataPartitionType.FILES.partitionPath());
-    HoodieMetadataPayload payload = new 
HoodieMetadataPayload(key.getRecordKey(), PARTITION_LIST, fileInfo);
+    HoodieMetadataPayload payload = new 
HoodieMetadataPayload(key.getRecordKey(), METADATA_TYPE_PARTITION_LIST,
+        fileInfo);
     return new HoodieRecord<>(key, payload);
   }
 
   /**
    * Create and return a {@code HoodieMetadataPayload} to save list of files 
within a partition.
    *
-   * @param partition The name of the partition
-   * @param filesAdded Mapping of files to their sizes for files which have 
been added to this partition
+   * @param partition    The name of the partition
+   * @param filesAdded   Mapping of files to their sizes for files which have 
been added to this partition
    * @param filesDeleted List of files which have been deleted from this 
partition
    */
   public static HoodieRecord<HoodieMetadataPayload> 
createPartitionFilesRecord(String partition,
-                                                                               
Option<Map<String, Long>> filesAdded, Option<List<String>> filesDeleted) {
+                                                                               
Option<Map<String, Long>> filesAdded,
+                                                                               
Option<List<String>> filesDeleted) {
     Map<String, HoodieMetadataFileInfo> fileInfo = new HashMap<>();
     filesAdded.ifPresent(
         m -> m.forEach((filename, size) -> fileInfo.put(filename, new 
HoodieMetadataFileInfo(size, false))));
     filesDeleted.ifPresent(
-        m -> m.forEach(filename -> fileInfo.put(filename, new 
HoodieMetadataFileInfo(0L,  true))));
+        m -> m.forEach(filename -> fileInfo.put(filename, new 
HoodieMetadataFileInfo(0L, true))));
 
     HoodieKey key = new HoodieKey(partition, 
MetadataPartitionType.FILES.partitionPath());
-    HoodieMetadataPayload payload = new 
HoodieMetadataPayload(key.getRecordKey(), FILE_LIST, fileInfo);
+    HoodieMetadataPayload payload = new 
HoodieMetadataPayload(key.getRecordKey(), METADATA_TYPE_FILE_LIST, fileInfo);
     return new HoodieRecord<>(key, payload);
   }
 
+  /**
+   * Create bloom filter metadata record.
+   *
+   * @param partitionName - Partition name
+   * @param baseFileName  - Base file name for which the bloom filter needs to 
persisted
+   * @param timestamp     - Instant timestamp responsible for this record
+   * @param bloomFilter   - Bloom filter for the File
+   * @param isDeleted     - Is the bloom filter no more valid
+   * @return Metadata payload containing the fileID and its bloom filter record
+   */
+  public static HoodieRecord<HoodieMetadataPayload> 
createBloomFilterMetadataRecord(final String partitionName,
+                                                                               
     final String baseFileName,
+                                                                               
     final String timestamp,
+                                                                               
     final ByteBuffer bloomFilter,
+                                                                               
     final boolean isDeleted) {
+    ValidationUtils.checkArgument(!baseFileName.contains(Path.SEPARATOR)
+            && FSUtils.isBaseFile(new Path(baseFileName)),

Review comment:
       I see we have this base file check in many places. wondering if we are 
doing too many checks. can we try to do it once and remove extraneous checks. 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -124,14 +202,111 @@ public static void deleteMetadataTable(String basePath, 
HoodieEngineContext cont
     return records;
   }
 
+  /**
+   * Convert commit action metadata to bloom filter records.
+   *
+   * @param commitMetadata - Commit action metadata
+   * @param dataMetaClient - Meta client for the data table
+   * @param instantTime    - Action instant time
+   * @return List of metadata table records
+   */
+  public static List<HoodieRecord> 
convertMetadataToBloomFilterRecords(HoodieCommitMetadata commitMetadata,
+                                                                       
HoodieTableMetaClient dataMetaClient,
+                                                                       String 
instantTime) {
+    List<HoodieRecord> records = new LinkedList<>();
+    commitMetadata.getPartitionToWriteStats().forEach((partitionStatName, 
writeStats) -> {
+      final String partition = partitionStatName.equals(EMPTY_PARTITION_NAME) 
? NON_PARTITIONED_NAME : partitionStatName;
+      Map<String, Long> newFiles = new HashMap<>(writeStats.size());
+      writeStats.forEach(hoodieWriteStat -> {
+        // No action for delta logs
+        if (hoodieWriteStat instanceof HoodieDeltaWriteStat) {
+          return;
+        }
+
+        String pathWithPartition = hoodieWriteStat.getPath();
+        if (pathWithPartition == null) {
+          // Empty partition
+          LOG.error("Failed to find path in write stat to update metadata 
table " + hoodieWriteStat);
+          return;
+        }
+        int offset = partition.equals(NON_PARTITIONED_NAME) ? 
(pathWithPartition.startsWith("/") ? 1 : 0) :
+            partition.length() + 1;
+
+        final String fileName = pathWithPartition.substring(offset);
+        if (!FSUtils.isBaseFile(new Path(fileName))) {
+          return;
+        }
+        ValidationUtils.checkState(!newFiles.containsKey(fileName), "Duplicate 
files in HoodieCommitMetadata");
+
+        final Path writeFilePath = new Path(dataMetaClient.getBasePath(), 
pathWithPartition);
+        try {
+          HoodieFileReader<IndexedRecord> fileReader =
+              
HoodieFileReaderFactory.getFileReader(dataMetaClient.getHadoopConf(), 
writeFilePath);
+          try {
+            final BloomFilter fileBloomFilter = fileReader.readBloomFilter();
+            if (fileBloomFilter == null) {
+              LOG.error("Failed to read bloom filter for " + writeFilePath);
+              return;
+            }
+            ByteBuffer bloomByteBuffer = 
ByteBuffer.wrap(fileBloomFilter.serializeToString().getBytes());
+            HoodieRecord record = 
HoodieMetadataPayload.createBloomFilterMetadataRecord(
+                partition, fileName, instantTime, bloomByteBuffer, false);
+            records.add(record);
+          } catch (Exception e) {
+            LOG.error("Failed to read bloom filter for " + writeFilePath);
+            return;

Review comment:
       Do we need to fail fast here? rather than moving ahead. 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
##########
@@ -124,6 +124,37 @@
       .sinceVersion("0.10.0")
       .withDocumentation("Enable full scanning of log files while reading log 
records. If disabled, hudi does look up of only interested entries.");
 
+  public static final ConfigProperty<Boolean> ENABLE_META_INDEX_BLOOM_FILTER = 
ConfigProperty
+      .key(METADATA_PREFIX + ".index.bloom.filter.enable")
+      .defaultValue(false)
+      .sinceVersion("0.11.0")
+      .withDocumentation("Enable indexing user data files bloom filters under 
metadata table. When enabled, "
+          + "metadata table will have a partition to store the bloom filter 
index and will be "
+          + "used during the index lookups.");
+
+  public static final ConfigProperty<Boolean> ENABLE_META_INDEX_COLUMN_STATS = 
ConfigProperty
+      .key(METADATA_PREFIX + ".index.column.stats.enable")
+      .defaultValue(false)
+      .sinceVersion("0.11.0")
+      .withDocumentation("Enable indexing user data files column ranges under 
metadata table key lookups. When "
+          + "enabled, metadata table will have a partition to store the column 
ranges and will "
+          + "used for pruning files during the index lookups.");
+
+  public static final ConfigProperty<Boolean> 
META_INDEX_COLUMN_STATS_FOR_ALL_COLUMNS = ConfigProperty
+      .key(METADATA_PREFIX + ".index.column.stats.all_columns")
+      .defaultValue(false)
+      .sinceVersion("0.11.0")
+      .withDocumentation("Enable indexing user data files column ranges under 
metadata table key lookups. When "
+          + "enabled, metadata table will have a partition to store the column 
ranges and will "
+          + "used for pruning files during the index lookups.");
+

Review comment:
       we might need to support a subset a columns to be indexed right. will it 
be part of future work? Do we have a tracking jira please. 
   Infact, I would suggest we should be cautious in supporting to index all 
columns. atleast prefer to have some constraints. for eg, only if total col 
count is less than say 25 or so, we can support indexing all columns. If not, 
only a a subset of columns as configured by user should be supported. 
   
   we can jam more on how to go about this. 

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomMetaIndexBatchCheckFunction.java
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bloom;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.bloom.BloomFilterTypeCode;
+import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.index.HoodieIndexUtils;
+import org.apache.hudi.io.HoodieKeyLookupResult;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.function.Function2;
+import scala.Tuple2;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Function performing actual checking of RDD partition containing (fileId, 
hoodieKeys) against the actual files.
+ */
+public class HoodieBloomMetaIndexBatchCheckFunction implements
+    Function2<Integer, Iterator<Tuple2<String, HoodieKey>>, 
Iterator<List<HoodieKeyLookupResult>>> {
+
+  private static final Logger LOG = 
LogManager.getLogger(HoodieBloomMetaIndexBatchCheckFunction.class);
+  private final HoodieTable hoodieTable;
+  private final HoodieWriteConfig config;
+
+  public HoodieBloomMetaIndexBatchCheckFunction(HoodieTable hoodieTable, 
HoodieWriteConfig config) {
+    this.hoodieTable = hoodieTable;
+    this.config = config;
+  }
+
+  @Override
+  public Iterator<List<HoodieKeyLookupResult>> call(Integer integer, 
Iterator<Tuple2<String, HoodieKey>> tuple2Iterator) throws Exception {
+    List<List<HoodieKeyLookupResult>> resultList = new ArrayList<>();
+    Map<Pair<String, String>, List<HoodieKey>> fileToKeysMap = new HashMap<>();
+
+    final Map<String, HoodieBaseFile> fileIDBaseFileMap = new HashMap<>();
+    while (tuple2Iterator.hasNext()) {
+      Tuple2<String, HoodieKey> entry = tuple2Iterator.next();
+      final String partitionPath = entry._2.getPartitionPath();
+      final String fileId = entry._1;
+      if (!fileIDBaseFileMap.containsKey(fileId)) {
+        Option<HoodieBaseFile> baseFile = 
hoodieTable.getBaseFileOnlyView().getLatestBaseFile(partitionPath, fileId);
+        if (!baseFile.isPresent()) {
+          throw new HoodieIndexException("Failed to find the base file for 
partition: " + partitionPath
+              + ", fileId: " + fileId);
+        }
+        fileIDBaseFileMap.put(fileId, baseFile.get());
+      }
+      fileToKeysMap.computeIfAbsent(Pair.of(partitionPath, 
fileIDBaseFileMap.get(fileId).getFileName()),
+          k -> new ArrayList<>()).add(entry._2);
+    }
+    if (fileToKeysMap.isEmpty()) {
+      return Collections.emptyListIterator();
+    }
+
+    List<Pair<String, String>> partitionNameFileNameList =
+        fileToKeysMap.keySet().stream().map(partitionNameFileNamePair -> {
+          return Pair.of(partitionNameFileNamePair.getLeft(), 
partitionNameFileNamePair.getRight());
+        }).collect(Collectors.toList());
+
+    Map<Pair<String, String>, ByteBuffer> fileIDToBloomFilterByteBufferMap =
+        
hoodieTable.getMetadataTable().getBloomFilters(partitionNameFileNameList);
+
+    fileToKeysMap.forEach((partitionPathFileIdPair, hoodieKeyList) -> {

Review comment:
       partitionPathFileNamePair

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomMetaIndexLazyCheckFunction.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bloom;
+
+import org.apache.hudi.client.utils.LazyIterableIterator;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.io.HoodieKeyLookupResult;
+import org.apache.hudi.io.HoodieKeyLookupHandle;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.function.Function2;
+import scala.Tuple2;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Function performing actual checking of RDD partition containing (fileId, 
hoodieKeys) against the actual files.
+ */
+public class HoodieBloomMetaIndexLazyCheckFunction implements 
Function2<Integer, Iterator<Tuple2<String, HoodieKey>>, 
Iterator<List<HoodieKeyLookupResult>>> {

Review comment:
       can you add sufficient java docs explaining what we do here and how is 
it diff from the other batch check function. also, do add sufficient java docs 
for other class too.

##########
File path: hudi-common/src/main/avro/HoodieMetadata.avsc
##########
@@ -30,27 +30,108 @@
             "doc": "Type of the metadata record",
             "type": "int"
         },
-        {   "name": "filesystemMetadata",
+        {
             "doc": "Contains information about partitions and files within the 
dataset",
-            "type": ["null", {
-               "type": "map",
-               "values": {
+            "name": "filesystemMetadata",
+            "type": [
+                "null",
+                {
+                    "type": "map",
+                    "values": {
+                        "type": "record",
+                        "name": "HoodieMetadataFileInfo",
+                        "fields": [
+                            {
+                                "name": "size",
+                                "type": "long",
+                                "doc": "Size of the file"
+                            },
+                            {
+                                "name": "isDeleted",
+                                "type": "boolean",
+                                "doc": "True if this file has been deleted"
+                            }
+                        ]
+                    }
+                }
+            ]
+        },
+        {
+            "doc": "Metadata Index of bloom filters for all data files in the 
user table",
+            "name": "BloomFilterMetadata",
+            "type": [
+                "null",
+                {
+                    "doc": "Data file bloom filter details",
+                    "name": "HoodieMetadataBloomFilter",
                     "type": "record",
-                    "name": "HoodieMetadataFileInfo",
                     "fields": [
                         {
-                            "name": "size",
-                            "type": "long",
-                            "doc": "Size of the file"
+                            "doc": "Bloom filter type code",
+                            "name": "type",
+                            "type": "string"
+                        },
+                        {
+                            "doc": "Instant timestamp when this metadata was 
created/updated",
+                            "name": "timestamp",

Review comment:
       may I know where do we use this? we don't have this for other partitions 
right(files/col stats) ? 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -319,9 +636,88 @@ private static void 
processRollbackMetadata(HoodieActiveTimeline metadataTableTi
     return records;
   }
 
+  /**
+   * Convert rollback action metadata to bloom filter index records.
+   */
+  private static List<HoodieRecord> 
convertFilesToBloomFilterRecords(HoodieEngineContext engineContext,
+                                                                     
HoodieTableMetaClient dataMetaClient,
+                                                                     
Map<String, List<String>> partitionToDeletedFiles,
+                                                                     
Map<String, Map<String, Long>> partitionToAppendedFiles,
+                                                                     String 
instantTime) {
+    List<HoodieRecord> records = new LinkedList<>();
+    partitionToDeletedFiles.forEach((partitionName, deletedFileList) -> 
deletedFileList.forEach(deletedFile -> {
+      if (!FSUtils.isBaseFile(new Path(deletedFile))) {
+        return;
+      }
+
+      final String partition = partitionName.equals(EMPTY_PARTITION_NAME) ? 
NON_PARTITIONED_NAME : partitionName;
+      records.add(HoodieMetadataPayload.createBloomFilterMetadataRecord(
+          partition, deletedFile, instantTime, ByteBuffer.allocate(0), true));
+    }));
+
+    partitionToAppendedFiles.forEach((partitionName, appendedFileMap) -> {
+      final String partition = partitionName.equals(EMPTY_PARTITION_NAME) ? 
NON_PARTITIONED_NAME : partitionName;
+      appendedFileMap.forEach((appendedFile, length) -> {
+        if (!FSUtils.isBaseFile(new Path(appendedFile))) {
+          return;
+        }
+        final String pathWithPartition = partitionName + "/" + appendedFile;
+        final Path appendedFilePath = new Path(dataMetaClient.getBasePath(), 
pathWithPartition);
+        try {
+          HoodieFileReader<IndexedRecord> fileReader =

Review comment:
       we can probably create a private method to fetch 
BloomFilterMetadatRecord for a given file and use it everywhere required 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
##########
@@ -109,55 +191,97 @@ private HoodieMetadataPayload(String key, int type, 
Map<String, HoodieMetadataFi
    */
   public static HoodieRecord<HoodieMetadataPayload> 
createPartitionListRecord(List<String> partitions) {
     Map<String, HoodieMetadataFileInfo> fileInfo = new HashMap<>();
-    partitions.forEach(partition -> fileInfo.put(partition, new 
HoodieMetadataFileInfo(0L,  false)));
+    partitions.forEach(partition -> fileInfo.put(partition, new 
HoodieMetadataFileInfo(0L, false)));
 
     HoodieKey key = new HoodieKey(RECORDKEY_PARTITION_LIST, 
MetadataPartitionType.FILES.partitionPath());
-    HoodieMetadataPayload payload = new 
HoodieMetadataPayload(key.getRecordKey(), PARTITION_LIST, fileInfo);
+    HoodieMetadataPayload payload = new 
HoodieMetadataPayload(key.getRecordKey(), METADATA_TYPE_PARTITION_LIST,
+        fileInfo);
     return new HoodieRecord<>(key, payload);
   }
 
   /**
    * Create and return a {@code HoodieMetadataPayload} to save list of files 
within a partition.
    *
-   * @param partition The name of the partition
-   * @param filesAdded Mapping of files to their sizes for files which have 
been added to this partition
+   * @param partition    The name of the partition
+   * @param filesAdded   Mapping of files to their sizes for files which have 
been added to this partition
    * @param filesDeleted List of files which have been deleted from this 
partition
    */
   public static HoodieRecord<HoodieMetadataPayload> 
createPartitionFilesRecord(String partition,
-                                                                               
Option<Map<String, Long>> filesAdded, Option<List<String>> filesDeleted) {
+                                                                               
Option<Map<String, Long>> filesAdded,
+                                                                               
Option<List<String>> filesDeleted) {
     Map<String, HoodieMetadataFileInfo> fileInfo = new HashMap<>();
     filesAdded.ifPresent(
         m -> m.forEach((filename, size) -> fileInfo.put(filename, new 
HoodieMetadataFileInfo(size, false))));
     filesDeleted.ifPresent(
-        m -> m.forEach(filename -> fileInfo.put(filename, new 
HoodieMetadataFileInfo(0L,  true))));
+        m -> m.forEach(filename -> fileInfo.put(filename, new 
HoodieMetadataFileInfo(0L, true))));
 
     HoodieKey key = new HoodieKey(partition, 
MetadataPartitionType.FILES.partitionPath());
-    HoodieMetadataPayload payload = new 
HoodieMetadataPayload(key.getRecordKey(), FILE_LIST, fileInfo);
+    HoodieMetadataPayload payload = new 
HoodieMetadataPayload(key.getRecordKey(), METADATA_TYPE_FILE_LIST, fileInfo);
     return new HoodieRecord<>(key, payload);
   }
 
+  /**
+   * Create bloom filter metadata record.
+   *
+   * @param partitionName - Partition name
+   * @param baseFileName  - Base file name for which the bloom filter needs to 
persisted
+   * @param timestamp     - Instant timestamp responsible for this record
+   * @param bloomFilter   - Bloom filter for the File
+   * @param isDeleted     - Is the bloom filter no more valid
+   * @return Metadata payload containing the fileID and its bloom filter record
+   */
+  public static HoodieRecord<HoodieMetadataPayload> 
createBloomFilterMetadataRecord(final String partitionName,
+                                                                               
     final String baseFileName,
+                                                                               
     final String timestamp,
+                                                                               
     final ByteBuffer bloomFilter,
+                                                                               
     final boolean isDeleted) {
+    ValidationUtils.checkArgument(!baseFileName.contains(Path.SEPARATOR)
+            && FSUtils.isBaseFile(new Path(baseFileName)),
+        "Invalid base file '" + baseFileName + "' for MetaIndexBloomFilter!");
+    final String bloomFilterKey = new 
PartitionIndexID(partitionName).asBase64EncodedString()
+        .concat(new FileIndexID(baseFileName).asBase64EncodedString());
+    HoodieKey key = new HoodieKey(bloomFilterKey, 
MetadataPartitionType.BLOOM_FILTERS.partitionPath());
+
+    // TODO: Get the bloom filter type from the file
+    HoodieMetadataBloomFilter metadataBloomFilter =
+        new HoodieMetadataBloomFilter(BloomFilterTypeCode.DYNAMIC_V0.name(),
+            timestamp, bloomFilter, isDeleted);
+    HoodieMetadataPayload metadataPayload = new 
HoodieMetadataPayload(key.getRecordKey(),
+        HoodieMetadataPayload.METADATA_TYPE_BLOOM_FILTER, metadataBloomFilter);
+    return new HoodieRecord<>(key, metadataPayload);
+  }
+
   @Override
   public HoodieMetadataPayload preCombine(HoodieMetadataPayload 
previousRecord) {
     ValidationUtils.checkArgument(previousRecord.type == type,
-        "Cannot combine " + previousRecord.type  + " with " + type);
-
-    Map<String, HoodieMetadataFileInfo> combinedFileInfo = null;
+        "Cannot combine " + previousRecord.type + " with " + type);
 
     switch (type) {
-      case PARTITION_LIST:
-      case FILE_LIST:
-        combinedFileInfo = combineFilesystemMetadata(previousRecord);
-        break;
+      case METADATA_TYPE_PARTITION_LIST:
+      case METADATA_TYPE_FILE_LIST:
+        Map<String, HoodieMetadataFileInfo> combinedFileInfo = 
combineFilesystemMetadata(previousRecord);
+        return new HoodieMetadataPayload(key, type, combinedFileInfo);
+      case METADATA_TYPE_BLOOM_FILTER:
+        HoodieMetadataBloomFilter combineBloomFilterMetadata = 
combineBloomFilterMetadata(previousRecord);
+        return new HoodieMetadataPayload(key, type, 
combineBloomFilterMetadata);
+      case METADATA_TYPE_COLUMN_STATS:
+        return new HoodieMetadataPayload(key, type, 
combineColumnStats(previousRecord));
       default:
         throw new HoodieMetadataException("Unknown type of 
HoodieMetadataPayload: " + type);
     }
+  }
+
+  private HoodieMetadataBloomFilter 
combineBloomFilterMetadata(HoodieMetadataPayload previousRecord) {
+    return this.bloomFilterMetadata;
+  }

Review comment:
       reason why we are going w/ this is, one metadatapayload can contain only 
one record pertaining to just one file. And hence, either it will be a new file 
or it will represent deleted file and hence we are good. 
   but its diff case with fileSystem metadata. bcoz, one record (for eg 
partition p1) might have had 3 files in record1 and could have just 2 in 
record2 and so we need detailed combine logic. 
   but its not required for col stats or bloom 
   
   is my understanding right. 
   

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java
##########
@@ -70,26 +69,44 @@ public static SparkHoodieBloomIndexHelper getInstance() {
     JavaRDD<Tuple2<String, HoodieKey>> fileComparisonsRDD =
         HoodieJavaRDD.getJavaRDD(fileComparisonPairs)
             .map(pair -> new Tuple2<>(pair.getLeft(), pair.getRight()));
-    Map<String, Long> comparisonsPerFileGroup = computeComparisonsPerFileGroup(
-        config, recordsPerPartition, partitionToFileInfo, fileComparisonsRDD, 
context);
-    int inputParallelism =
-        
HoodieJavaPairRDD.getJavaPairRDD(partitionRecordKeyPairs).partitions().size();
+
+    int inputParallelism = 
HoodieJavaPairRDD.getJavaPairRDD(partitionRecordKeyPairs).partitions().size();
     int joinParallelism = Math.max(inputParallelism, 
config.getBloomIndexParallelism());
     LOG.info("InputParallelism: ${" + inputParallelism + "}, IndexParallelism: 
${"
         + config.getBloomIndexParallelism() + "}");
 
-    if (config.useBloomIndexBucketizedChecking()) {
+    JavaRDD<List<HoodieKeyLookupResult>> keyLookupResultRDD;
+    if (config.isMetaIndexBloomFilterEnabled()) {
+      // Step 1: Sort by file id
+      JavaRDD<Tuple2<String, HoodieKey>> sortedFileIdAndKeyPairs =
+          fileComparisonsRDD.sortBy(entry -> entry._1, true, joinParallelism);
+
+      // Step 2: Use bloom filter to filter and the actual log file to get the 
record location
+      final boolean isBloomFiltersBatchLoadEnabled = 
config.isMetaIndexBloomFilterBatchLoadEnabled();
+      if (isBloomFiltersBatchLoadEnabled) {

Review comment:
       something to think about. not required in this patch. 
   similar to how we do comparisonsPerFileGroup, we can try to fetch how many 
file groups are involved and then take some informed decision to go w/ batch 
load of lazy evaluation here. 
   
   

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
##########
@@ -146,6 +156,111 @@ protected BaseTableMetadata(HoodieEngineContext 
engineContext, HoodieMetadataCon
         .getAllFilesInPartitions(partitions);
   }
 
+  @Override
+  public Option<ByteBuffer> getBloomFilter(final String partitionName, final 
String fileName)
+      throws HoodieMetadataException {
+    if (!isMetaIndexBloomFilterEnabled) {
+      LOG.error("Meta index for bloom filters is disabled!");
+      return Option.empty();
+    }
+
+    final Pair<String, String> partitionFileName = Pair.of(partitionName, 
fileName);
+    Map<Pair<String, String>, ByteBuffer> bloomFilters = 
getBloomFilters(Collections.singletonList(partitionFileName));
+    if (bloomFilters.isEmpty()) {
+      LOG.error("Meta index: missing bloom filter for partition: " + 
partitionName + ", file: " + fileName);
+      return Option.empty();
+    }
+
+    ValidationUtils.checkState(bloomFilters.containsKey(partitionFileName));
+    return Option.of(bloomFilters.get(partitionFileName));
+  }
+
+  @Override
+  public Map<Pair<String, String>, ByteBuffer> getBloomFilters(final 
List<Pair<String, String>> partitionNameFileNameList)
+      throws HoodieMetadataException {
+    if (!isMetaIndexBloomFilterEnabled) {
+      LOG.error("Meta index for bloom filter is disabled!");
+      return Collections.emptyMap();
+    }
+
+    HoodieTimer timer = new HoodieTimer().startTimer();
+    Set<String> partitionIDFileIDSortedStrings = new TreeSet<>();
+    Map<String, Pair<String, String>> fileToKeyMap = new HashMap<>();
+    partitionNameFileNameList.forEach(partitionNameFileNamePair -> {
+          final String bloomKey = new 
PartitionIndexID(partitionNameFileNamePair.getLeft()).asBase64EncodedString()
+              .concat(new 
FileIndexID(partitionNameFileNamePair.getRight()).asBase64EncodedString());
+          partitionIDFileIDSortedStrings.add(bloomKey);
+          fileToKeyMap.put(bloomKey, partitionNameFileNamePair);
+        }
+    );
+
+    List<String> partitionIDFileIDStrings = new 
ArrayList<>(partitionIDFileIDSortedStrings);
+    List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> 
hoodieRecordList =
+        getRecordsByKeys(partitionIDFileIDStrings, 
MetadataPartitionType.BLOOM_FILTERS.partitionPath());
+    metrics.ifPresent(m -> 
m.updateMetrics(HoodieMetadataMetrics.LOOKUP_BLOOM_FILTERS_METADATA_STR, 
timer.endTimer()));
+
+    Map<Pair<String, String>, ByteBuffer> partitionFileToBloomFilterMap = new 
HashMap<>();
+    for (final Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>> entry 
: hoodieRecordList) {
+      if (entry.getRight().isPresent()) {
+        final Option<HoodieMetadataBloomFilter> bloomFilterMetadata =
+            entry.getRight().get().getData().getBloomFilterMetadata();
+        if (bloomFilterMetadata.isPresent()) {
+          if (!bloomFilterMetadata.get().getIsDeleted()) {
+            
ValidationUtils.checkState(fileToKeyMap.containsKey(entry.getLeft()));
+            
partitionFileToBloomFilterMap.put(fileToKeyMap.get(entry.getLeft()), 
bloomFilterMetadata.get().getBloomFilter());
+          }
+        } else {
+          LOG.error("Meta index bloom filter missing for: " + 
fileToKeyMap.get(entry.getLeft()));
+        }
+      }
+    }
+    return partitionFileToBloomFilterMap;
+  }
+
+  @Override
+  public Map<Pair<String, String>, HoodieColumnStats> getColumnStats(final 
List<Pair<String, String>> partitionNameFileNameList, final String columnName)
+      throws HoodieMetadataException {
+    if (!isMetaIndexColumnStatsEnabled) {
+      LOG.error("Meta index for column stats is disabled!");
+      return Collections.emptyMap();
+    }
+
+    Map<String, Pair<String, String>> columnStatKeyToFileNameMap = new 
HashMap<>();
+    List<String> columnStatKeys = new ArrayList<>();
+    final String columnIndexStr = new 
ColumnIndexID(columnName).asBase64EncodedString();
+    for (Pair<String, String> partitionNameFileNamePair : 
partitionNameFileNameList) {
+      final String columnStatIndexKey = columnIndexStr
+          .concat(new 
PartitionIndexID(partitionNameFileNamePair.getLeft()).asBase64EncodedString())
+          .concat(new 
FileIndexID(partitionNameFileNamePair.getRight()).asBase64EncodedString());
+      columnStatKeys.add(columnStatIndexKey);

Review comment:
       shouldn't we add this to treeset and then add to list. we need this to 
be sorted in my understanding. 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -399,4 +795,119 @@ public static int mapRecordKeyToFileGroupIndex(String 
recordKey, int numFileGrou
     return fileSliceStream.sorted((s1, s2) -> 
s1.getFileId().compareTo(s2.getFileId())).collect(Collectors.toList());
   }
 
+  public static List<HoodieRecord> 
convertMetadataToColumnStatsRecords(HoodieCommitMetadata commitMetadata,
+                                                                       
HoodieEngineContext engineContext,
+                                                                       
HoodieTableMetaClient dataMetaClient,
+                                                                       boolean 
isMetaIndexColumnStatsForAllColumns,
+                                                                       String 
instantTime) {
+
+    try {
+      List<HoodieWriteStat> allWriteStats = 
commitMetadata.getPartitionToWriteStats().values().stream()
+          .flatMap(entry -> entry.stream()).collect(Collectors.toList());
+      return 
HoodieTableMetadataUtil.createColumnStatsFromWriteStats(engineContext, 
dataMetaClient, allWriteStats,
+          isMetaIndexColumnStatsForAllColumns);
+    } catch (Exception e) {
+      throw new HoodieException("Failed to generate column stats records for 
metadata table ", e);
+    }
+  }
+
+  /**
+   * Create column stats from write status.
+   *
+   * @param engineContext                       - Enging context
+   * @param datasetMetaClient                   - Dataset meta client
+   * @param allWriteStats                       - Write status to convert
+   * @param isMetaIndexColumnStatsForAllColumns - Are all columns enabled for 
indexing
+   */
+  public static List<HoodieRecord> 
createColumnStatsFromWriteStats(HoodieEngineContext engineContext,
+                                                                   
HoodieTableMetaClient datasetMetaClient,
+                                                                   
List<HoodieWriteStat> allWriteStats,
+                                                                   boolean 
isMetaIndexColumnStatsForAllColumns) throws Exception {
+    if (allWriteStats.isEmpty()) {
+      return Collections.emptyList();
+    }
+
+    List<HoodieWriteStat> prunedWriteStats = 
allWriteStats.stream().filter(writeStat -> {
+      return !(writeStat instanceof HoodieDeltaWriteStat);
+    }).collect(Collectors.toList());
+    if (prunedWriteStats.isEmpty()) {
+      return Collections.emptyList();
+    }
+
+    return engineContext.flatMap(prunedWriteStats,
+        writeStat -> translateWriteStatToColumnStats(writeStat, 
datasetMetaClient,
+            getLatestColumns(datasetMetaClient, 
isMetaIndexColumnStatsForAllColumns)),
+        prunedWriteStats.size());
+  }
+
+  /**
+   * Get the latest columns for the table for column stats indexing.
+   *
+   * @param datasetMetaClient                   - Data table meta client
+   * @param isMetaIndexColumnStatsForAllColumns - Is column stats indexing 
enabled for all columns
+   */
+  private static List<String> getLatestColumns(HoodieTableMetaClient 
datasetMetaClient, boolean isMetaIndexColumnStatsForAllColumns) {
+    if (!isMetaIndexColumnStatsForAllColumns) {
+      return 
Collections.singletonList(datasetMetaClient.getTableConfig().getRecordKeyFieldProp());
+    }
+
+    if 
(datasetMetaClient.getCommitsTimeline().filterCompletedInstants().countInstants()
 > 1) {
+      TableSchemaResolver schemaResolver = new 
TableSchemaResolver(datasetMetaClient);
+      // consider nested fields as well. if column stats is enabled only for a 
subset of columns,
+      // directly use them instead of all columns from the latest table schema
+      try {
+        return schemaResolver.getTableAvroSchema().getFields().stream()
+            .map(entry -> entry.name()).collect(Collectors.toList());
+      } catch (Exception e) {
+        throw new HoodieException("Failed to get latest columns for " + 
datasetMetaClient.getBasePath());
+      }
+    } else {
+      return Collections.emptyList();
+    }
+  }
+
+  private static List<String> getLatestColumns(HoodieTableMetaClient 
datasetMetaClient) {
+    return getLatestColumns(datasetMetaClient, false);

Review comment:
       so, looks like we index only record key in this patch is it ?
   I mean, we don't support adding index for all cols. is my understanding 
right.

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
##########
@@ -109,55 +191,97 @@ private HoodieMetadataPayload(String key, int type, 
Map<String, HoodieMetadataFi
    */
   public static HoodieRecord<HoodieMetadataPayload> 
createPartitionListRecord(List<String> partitions) {
     Map<String, HoodieMetadataFileInfo> fileInfo = new HashMap<>();
-    partitions.forEach(partition -> fileInfo.put(partition, new 
HoodieMetadataFileInfo(0L,  false)));
+    partitions.forEach(partition -> fileInfo.put(partition, new 
HoodieMetadataFileInfo(0L, false)));
 
     HoodieKey key = new HoodieKey(RECORDKEY_PARTITION_LIST, 
MetadataPartitionType.FILES.partitionPath());
-    HoodieMetadataPayload payload = new 
HoodieMetadataPayload(key.getRecordKey(), PARTITION_LIST, fileInfo);
+    HoodieMetadataPayload payload = new 
HoodieMetadataPayload(key.getRecordKey(), METADATA_TYPE_PARTITION_LIST,
+        fileInfo);
     return new HoodieRecord<>(key, payload);
   }
 
   /**
    * Create and return a {@code HoodieMetadataPayload} to save list of files 
within a partition.
    *
-   * @param partition The name of the partition
-   * @param filesAdded Mapping of files to their sizes for files which have 
been added to this partition
+   * @param partition    The name of the partition
+   * @param filesAdded   Mapping of files to their sizes for files which have 
been added to this partition
    * @param filesDeleted List of files which have been deleted from this 
partition
    */
   public static HoodieRecord<HoodieMetadataPayload> 
createPartitionFilesRecord(String partition,
-                                                                               
Option<Map<String, Long>> filesAdded, Option<List<String>> filesDeleted) {
+                                                                               
Option<Map<String, Long>> filesAdded,
+                                                                               
Option<List<String>> filesDeleted) {
     Map<String, HoodieMetadataFileInfo> fileInfo = new HashMap<>();
     filesAdded.ifPresent(
         m -> m.forEach((filename, size) -> fileInfo.put(filename, new 
HoodieMetadataFileInfo(size, false))));
     filesDeleted.ifPresent(
-        m -> m.forEach(filename -> fileInfo.put(filename, new 
HoodieMetadataFileInfo(0L,  true))));
+        m -> m.forEach(filename -> fileInfo.put(filename, new 
HoodieMetadataFileInfo(0L, true))));
 
     HoodieKey key = new HoodieKey(partition, 
MetadataPartitionType.FILES.partitionPath());
-    HoodieMetadataPayload payload = new 
HoodieMetadataPayload(key.getRecordKey(), FILE_LIST, fileInfo);
+    HoodieMetadataPayload payload = new 
HoodieMetadataPayload(key.getRecordKey(), METADATA_TYPE_FILE_LIST, fileInfo);
     return new HoodieRecord<>(key, payload);
   }
 
+  /**
+   * Create bloom filter metadata record.
+   *
+   * @param partitionName - Partition name
+   * @param baseFileName  - Base file name for which the bloom filter needs to 
persisted
+   * @param timestamp     - Instant timestamp responsible for this record
+   * @param bloomFilter   - Bloom filter for the File
+   * @param isDeleted     - Is the bloom filter no more valid
+   * @return Metadata payload containing the fileID and its bloom filter record
+   */
+  public static HoodieRecord<HoodieMetadataPayload> 
createBloomFilterMetadataRecord(final String partitionName,
+                                                                               
     final String baseFileName,
+                                                                               
     final String timestamp,
+                                                                               
     final ByteBuffer bloomFilter,
+                                                                               
     final boolean isDeleted) {
+    ValidationUtils.checkArgument(!baseFileName.contains(Path.SEPARATOR)
+            && FSUtils.isBaseFile(new Path(baseFileName)),
+        "Invalid base file '" + baseFileName + "' for MetaIndexBloomFilter!");
+    final String bloomFilterKey = new 
PartitionIndexID(partitionName).asBase64EncodedString()
+        .concat(new FileIndexID(baseFileName).asBase64EncodedString());
+    HoodieKey key = new HoodieKey(bloomFilterKey, 
MetadataPartitionType.BLOOM_FILTERS.partitionPath());
+
+    // TODO: Get the bloom filter type from the file
+    HoodieMetadataBloomFilter metadataBloomFilter =
+        new HoodieMetadataBloomFilter(BloomFilterTypeCode.DYNAMIC_V0.name(),
+            timestamp, bloomFilter, isDeleted);
+    HoodieMetadataPayload metadataPayload = new 
HoodieMetadataPayload(key.getRecordKey(),
+        HoodieMetadataPayload.METADATA_TYPE_BLOOM_FILTER, metadataBloomFilter);
+    return new HoodieRecord<>(key, metadataPayload);
+  }
+
   @Override
   public HoodieMetadataPayload preCombine(HoodieMetadataPayload 
previousRecord) {
     ValidationUtils.checkArgument(previousRecord.type == type,
-        "Cannot combine " + previousRecord.type  + " with " + type);
-
-    Map<String, HoodieMetadataFileInfo> combinedFileInfo = null;
+        "Cannot combine " + previousRecord.type + " with " + type);
 
     switch (type) {
-      case PARTITION_LIST:
-      case FILE_LIST:
-        combinedFileInfo = combineFilesystemMetadata(previousRecord);
-        break;
+      case METADATA_TYPE_PARTITION_LIST:
+      case METADATA_TYPE_FILE_LIST:
+        Map<String, HoodieMetadataFileInfo> combinedFileInfo = 
combineFilesystemMetadata(previousRecord);
+        return new HoodieMetadataPayload(key, type, combinedFileInfo);
+      case METADATA_TYPE_BLOOM_FILTER:
+        HoodieMetadataBloomFilter combineBloomFilterMetadata = 
combineBloomFilterMetadata(previousRecord);
+        return new HoodieMetadataPayload(key, type, 
combineBloomFilterMetadata);
+      case METADATA_TYPE_COLUMN_STATS:
+        return new HoodieMetadataPayload(key, type, 
combineColumnStats(previousRecord));

Review comment:
       nit: combineColumnStatsMetadata

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java
##########
@@ -70,26 +69,44 @@ public static SparkHoodieBloomIndexHelper getInstance() {
     JavaRDD<Tuple2<String, HoodieKey>> fileComparisonsRDD =
         HoodieJavaRDD.getJavaRDD(fileComparisonPairs)
             .map(pair -> new Tuple2<>(pair.getLeft(), pair.getRight()));
-    Map<String, Long> comparisonsPerFileGroup = computeComparisonsPerFileGroup(
-        config, recordsPerPartition, partitionToFileInfo, fileComparisonsRDD, 
context);
-    int inputParallelism =
-        
HoodieJavaPairRDD.getJavaPairRDD(partitionRecordKeyPairs).partitions().size();
+
+    int inputParallelism = 
HoodieJavaPairRDD.getJavaPairRDD(partitionRecordKeyPairs).partitions().size();
     int joinParallelism = Math.max(inputParallelism, 
config.getBloomIndexParallelism());
     LOG.info("InputParallelism: ${" + inputParallelism + "}, IndexParallelism: 
${"
         + config.getBloomIndexParallelism() + "}");
 
-    if (config.useBloomIndexBucketizedChecking()) {
+    JavaRDD<List<HoodieKeyLookupResult>> keyLookupResultRDD;
+    if (config.isMetaIndexBloomFilterEnabled()) {
+      // Step 1: Sort by file id
+      JavaRDD<Tuple2<String, HoodieKey>> sortedFileIdAndKeyPairs =
+          fileComparisonsRDD.sortBy(entry -> entry._1, true, joinParallelism);
+
+      // Step 2: Use bloom filter to filter and the actual log file to get the 
record location
+      final boolean isBloomFiltersBatchLoadEnabled = 
config.isMetaIndexBloomFilterBatchLoadEnabled();
+      if (isBloomFiltersBatchLoadEnabled) {

Review comment:
       bcoz, I see by default batch load is enabled. for larger datasets, 
probably we should go w/ lazy. 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -735,14 +768,19 @@ protected void bootstrapCommit(List<DirectoryInfo> 
partitionInfoList, String cre
     List<String> partitions = partitionInfoList.stream().map(p ->
         p.getRelativePath().isEmpty() ? NON_PARTITIONED_NAME : 
p.getRelativePath()).collect(Collectors.toList());
     final int totalFiles = partitionInfoList.stream().mapToInt(p -> 
p.getTotalFiles()).sum();
+    final Map<MetadataPartitionType, HoodieData<HoodieRecord>> 
partitionToRecordsMap = new HashMap<>();
 
     // Record which saves the list of all partitions
     HoodieRecord allPartitionRecord = 
HoodieMetadataPayload.createPartitionListRecord(partitions);
     if (partitions.isEmpty()) {
-      // in case of boostrapping of a fresh table, there won't be any 
partitions, but we need to make a boostrap commit
-      
commit(engineContext.parallelize(Collections.singletonList(allPartitionRecord), 
1), MetadataPartitionType.FILES.partitionPath(), createInstantTime, false);
+      // in case of bootstrapping of a fresh table, there won't be any 
partitions, but we need to make a boostrap commit
+      final HoodieData<HoodieRecord> allPartitionRecordsRDD = 
engineContext.parallelize(
+          Collections.singletonList(allPartitionRecord), 1);
+      partitionToRecordsMap.put(MetadataPartitionType.FILES, 
allPartitionRecordsRDD);
+      commit(createInstantTime, partitionToRecordsMap, false);

Review comment:
       can you please link the jira here. 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -109,6 +109,7 @@
   protected boolean enabled;
   protected SerializableConfiguration hadoopConf;
   protected final transient HoodieEngineContext engineContext;
+  protected final List<MetadataPartitionType> enabledPartitionTypes;

Review comment:
       can you please link the tracking jira here so that we don't miss out.

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomMetaIndexBatchCheckFunction.java
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bloom;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.bloom.BloomFilterTypeCode;
+import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.index.HoodieIndexUtils;
+import org.apache.hudi.io.HoodieKeyLookupResult;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.function.Function2;
+import scala.Tuple2;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Function performing actual checking of RDD partition containing (fileId, 
hoodieKeys) against the actual files.
+ */
+public class HoodieBloomMetaIndexBatchCheckFunction implements
+    Function2<Integer, Iterator<Tuple2<String, HoodieKey>>, 
Iterator<List<HoodieKeyLookupResult>>> {
+
+  private static final Logger LOG = 
LogManager.getLogger(HoodieBloomMetaIndexBatchCheckFunction.class);
+  private final HoodieTable hoodieTable;
+  private final HoodieWriteConfig config;
+
+  public HoodieBloomMetaIndexBatchCheckFunction(HoodieTable hoodieTable, 
HoodieWriteConfig config) {
+    this.hoodieTable = hoodieTable;
+    this.config = config;
+  }
+
+  @Override
+  public Iterator<List<HoodieKeyLookupResult>> call(Integer integer, 
Iterator<Tuple2<String, HoodieKey>> tuple2Iterator) throws Exception {
+    List<List<HoodieKeyLookupResult>> resultList = new ArrayList<>();
+    Map<Pair<String, String>, List<HoodieKey>> fileToKeysMap = new HashMap<>();
+
+    final Map<String, HoodieBaseFile> fileIDBaseFileMap = new HashMap<>();
+    while (tuple2Iterator.hasNext()) {
+      Tuple2<String, HoodieKey> entry = tuple2Iterator.next();
+      final String partitionPath = entry._2.getPartitionPath();
+      final String fileId = entry._1;
+      if (!fileIDBaseFileMap.containsKey(fileId)) {
+        Option<HoodieBaseFile> baseFile = 
hoodieTable.getBaseFileOnlyView().getLatestBaseFile(partitionPath, fileId);
+        if (!baseFile.isPresent()) {
+          throw new HoodieIndexException("Failed to find the base file for 
partition: " + partitionPath
+              + ", fileId: " + fileId);
+        }
+        fileIDBaseFileMap.put(fileId, baseFile.get());
+      }
+      fileToKeysMap.computeIfAbsent(Pair.of(partitionPath, 
fileIDBaseFileMap.get(fileId).getFileName()),
+          k -> new ArrayList<>()).add(entry._2);
+    }
+    if (fileToKeysMap.isEmpty()) {
+      return Collections.emptyListIterator();
+    }
+
+    List<Pair<String, String>> partitionNameFileNameList =

Review comment:
       or 
   ```
       List<Pair<String, String>> partitionNameFileNameList = new 
ArrayList<>(fileToKeysMap.keySet());
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to