prashantwason commented on a change in pull request #4352:
URL: https://github.com/apache/hudi/pull/4352#discussion_r792019607



##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
##########
@@ -1444,6 +1444,14 @@ public boolean useBloomIndexBucketizedChecking() {
     return getBoolean(HoodieIndexConfig.BLOOM_INDEX_BUCKETIZED_CHECKING);
   }
 
+  public boolean isMetadataIndexBloomFilterEnabled() {
+    return isMetadataTableEnabled() && 
getMetadataConfig().isMetadataIndexBloomFilterEnabled();
+  }
+
+  public boolean isMetadataIndexColumnStatsForAllColumns() {

Review comment:
       No enabled suffix here? For consistency with other such configs, maybe 
rename to isMetadataColumnStatsIndexEnabled.

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
##########
@@ -133,30 +144,89 @@ public HoodieBloomIndex(HoodieWriteConfig config, 
BaseHoodieBloomIndexHelper blo
   /**
    * Load all involved files as <Partition, filename> pair List.
    */
-  List<Pair<String, BloomIndexFileInfo>> loadInvolvedFiles(
+  List<Pair<String, BloomIndexFileInfo>> loadColumnRangesFromFiles(
       List<String> partitions, final HoodieEngineContext context, final 
HoodieTable hoodieTable) {
     // Obtain the latest data files from all the partitions.
     List<Pair<String, String>> partitionPathFileIDList = 
getLatestBaseFilesForAllPartitions(partitions, context, hoodieTable).stream()
         .map(pair -> Pair.of(pair.getKey(), pair.getValue().getFileId()))
         .collect(toList());
 
-    if (config.getBloomIndexPruneByRanges()) {
-      // also obtain file ranges, if range pruning is enabled
-      context.setJobStatus(this.getClass().getName(), "Obtain key ranges for 
file slices (range pruning=on)");
-      return context.map(partitionPathFileIDList, pf -> {
-        try {
-          HoodieRangeInfoHandle rangeInfoHandle = new 
HoodieRangeInfoHandle(config, hoodieTable, pf);
-          String[] minMaxKeys = rangeInfoHandle.getMinMaxKeys();
-          return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue(), 
minMaxKeys[0], minMaxKeys[1]));
-        } catch (MetadataNotFoundException me) {
-          LOG.warn("Unable to find range metadata in file :" + pf);
-          return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue()));
+    context.setJobStatus(this.getClass().getName(), "Obtain key ranges for 
file slices (range pruning=on)");
+    return context.map(partitionPathFileIDList, pf -> {
+      try {
+        HoodieRangeInfoHandle rangeInfoHandle = new 
HoodieRangeInfoHandle(config, hoodieTable, pf);
+        String[] minMaxKeys = rangeInfoHandle.getMinMaxKeys();
+        return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue(), 
minMaxKeys[0], minMaxKeys[1]));
+      } catch (MetadataNotFoundException me) {
+        LOG.warn("Unable to find range metadata in file :" + pf);
+        return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue()));
+      }
+    }, Math.max(partitionPathFileIDList.size(), 1));
+  }
+
+  /**
+   * Get BloomIndexFileInfo for all the latest base files for the requested 
partitions.
+   *
+   * @param partitions  - List of partitions to get the base files for
+   * @param context     - Engine context
+   * @param hoodieTable - Hoodie Table
+   * @return List of partition and file column range info pairs
+   */
+  private List<Pair<String, BloomIndexFileInfo>> getFileInfoForLatestBaseFiles(
+      List<String> partitions, final HoodieEngineContext context, final 
HoodieTable hoodieTable) {
+    List<Pair<String, String>> partitionPathFileIDList = 
getLatestBaseFilesForAllPartitions(partitions, context,
+        hoodieTable).stream()
+        .map(pair -> Pair.of(pair.getKey(), pair.getValue().getFileId()))
+        .collect(toList());
+    return partitionPathFileIDList.stream()
+        .map(pf -> Pair.of(pf.getKey(), new 
BloomIndexFileInfo(pf.getValue()))).collect(toList());
+  }
+
+  /**
+   * Load the column stats index as BloomIndexFileInfo for all the involved 
files in the partition.
+   *
+   * @param partitions  - List of partitions for which column stats need to be 
loaded
+   * @param context     - Engine context
+   * @param hoodieTable - Hoodie table
+   * @return List of partition and file column range info pairs
+   */
+  protected List<Pair<String, BloomIndexFileInfo>> 
loadColumnRangesFromMetaIndex(
+      List<String> partitions, final HoodieEngineContext context, final 
HoodieTable hoodieTable) {
+    // also obtain file ranges, if range pruning is enabled
+    context.setJobStatus(this.getClass().getName(), "Load meta index key 
ranges for file slices");
+
+    final String keyField = 
hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp();
+    return context.flatMap(partitions, partitionName -> {
+      List<String> partitionFileNameList = 
HoodieIndexUtils.getLatestBaseFilesForPartition(partitionName,
+              hoodieTable).stream().map(baseFile -> baseFile.getFileName())
+          .collect(toList());
+      try {
+        List<Pair<String, String>> columnStatKeys = new ArrayList<>();

Review comment:
       Simpler way to implement this is to stream this on the 
partitionFileNameList instead of a for-loop.
   
   HoodieIndexUtils.getLatestBaseFilesForPartition(partitionName, 
hoodieTable).stream().map(baseFile -> baseFile.getFileName())
       .map(fileName -> new Pair.of(partitionName, fileName))
       .sorted()
       .collect(toList());

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
##########
@@ -101,4 +116,34 @@ public static HoodieRecord getTaggedRecord(HoodieRecord 
inputRecord, Option<Hood
     }
     return record;
   }
+
+  /**
+   * Given a list of row keys and one file, return only row keys existing in 
that file.
+   *
+   * @param filePath            - File to filter keys from
+   * @param candidateRecordKeys - Candidate keys to filter
+   * @return List of candidate keys that are available in the file
+   */
+  public static List<String> filterKeysFromFile(Path filePath, List<String> 
candidateRecordKeys,
+                                                Configuration configuration) 
throws HoodieIndexException {
+    ValidationUtils.checkArgument(FSUtils.isBaseFile(filePath));
+    List<String> foundRecordKeys = new ArrayList<>();
+    try {
+      // Load all rowKeys from the file, to double-confirm
+      if (!candidateRecordKeys.isEmpty()) {
+        HoodieTimer timer = new HoodieTimer().startTimer();
+        HoodieFileReader fileReader = 
HoodieFileReaderFactory.getFileReader(configuration, filePath);
+        Set<String> fileRowKeys = fileReader.filterKeys(new 
TreeSet<>(candidateRecordKeys));

Review comment:
       TreeSet or HashMap here? Lookups are much faster in HashMap

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -128,8 +130,21 @@
     this.dataWriteConfig = writeConfig;
     this.engineContext = engineContext;
     this.hadoopConf = new SerializableConfiguration(hadoopConf);
+    this.metrics = Option.empty();
+    this.enabledPartitionTypes = new ArrayList<>();
 
     if (writeConfig.isMetadataTableEnabled()) {
+      this.enabledPartitionTypes.add(MetadataPartitionType.FILES);
+      if (writeConfig.getMetadataConfig().isMetadataIndexBloomFilterEnabled()) 
{
+        MetadataPartitionType.BLOOM_FILTERS.setFileGroupCount(
+            
writeConfig.getMetadataConfig().getMetadataIndexBloomFilterFileGroupCount());
+        this.enabledPartitionTypes.add(MetadataPartitionType.BLOOM_FILTERS);
+      }
+      if (writeConfig.getMetadataConfig().isMetadataIndexColumnStatsEnabled()) 
{
+        MetadataPartitionType.COLUMN_STATS.setFileGroupCount(

Review comment:
       Same as above. We should not require this to be specified all the time.

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieMetadataBloomIndexCheckFunction.java
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bloom;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.client.utils.LazyIterableIterator;
+import org.apache.hudi.common.bloom.BloomFilterTypeCode;
+import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.index.HoodieIndexUtils;
+import org.apache.hudi.io.HoodieKeyLookupResult;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.function.Function2;
+import scala.Tuple2;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+/**
+ * Spark Function2 implementation for checking bloom filters for the
+ * requested keys from the metadata table index. The bloom filter
+ * checking for keys and the actual file verification for the
+ * candidate keys is done in an iterative fashion. In each iteration,
+ * bloom filters are requested for a batch of partition files and the
+ * keys are checked against them.
+ */
+public class HoodieMetadataBloomIndexCheckFunction implements

Review comment:
       I did not get why a new file for metadata specific bloom check is 
required? 
   
   

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
##########
@@ -171,17 +172,34 @@ protected void commit(HoodieData<HoodieRecord> 
hoodieDataRecords, String partiti
 
   /**
    * Tag each record with the location in the given partition.
-   *
+   * <p>
    * The record is tagged with respective file slice's location based on its 
record key.
    */
-  private JavaRDD<HoodieRecord> prepRecords(JavaRDD<HoodieRecord> recordsRDD, 
String partitionName, int numFileGroups) {
-    List<FileSlice> fileSlices = 
HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient, 
partitionName);
-    ValidationUtils.checkArgument(fileSlices.size() == numFileGroups, 
String.format("Invalid number of file groups: found=%d, required=%d", 
fileSlices.size(), numFileGroups));
-
-    return recordsRDD.map(r -> {
-      FileSlice slice = 
fileSlices.get(HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(r.getRecordKey(),
 numFileGroups));
-      r.setCurrentLocation(new 
HoodieRecordLocation(slice.getBaseInstantTime(), slice.getFileId()));
-      return r;
-    });
+  private JavaRDD<HoodieRecord> prepRecords(Map<MetadataPartitionType, 
HoodieData<HoodieRecord>> partitionRecordsMap) {

Review comment:
       This can be moved to the base class. Some ideas applicable here:
     1. HoodieData.map() to tag records instead of RDD.map
     2. HoodieData.union() instead of RDD.union
   
   If so, you dont need a flink specific implementation too.

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
##########
@@ -146,12 +156,119 @@ protected BaseTableMetadata(HoodieEngineContext 
engineContext, HoodieMetadataCon
         .getAllFilesInPartitions(partitions);
   }
 
+  @Override
+  public Option<ByteBuffer> getBloomFilter(final String partitionName, final 
String fileName)
+      throws HoodieMetadataException {
+    if (!isMetaIndexBloomFilterEnabled) {
+      LOG.error("Meta index for bloom filters is disabled!");
+      return Option.empty();
+    }
+
+    final Pair<String, String> partitionFileName = Pair.of(partitionName, 
fileName);
+    Map<Pair<String, String>, ByteBuffer> bloomFilters = 
getBloomFilters(Collections.singletonList(partitionFileName));
+    if (bloomFilters.isEmpty()) {
+      LOG.error("Meta index: missing bloom filter for partition: " + 
partitionName + ", file: " + fileName);
+      return Option.empty();
+    }
+
+    ValidationUtils.checkState(bloomFilters.containsKey(partitionFileName));
+    return Option.of(bloomFilters.get(partitionFileName));
+  }
+
+  @Override
+  public Map<Pair<String, String>, ByteBuffer> getBloomFilters(final 
List<Pair<String, String>> partitionNameFileNameList)
+      throws HoodieMetadataException {
+    if (!isMetaIndexBloomFilterEnabled) {
+      LOG.error("Meta index for bloom filter is disabled!");
+      return Collections.emptyMap();
+    }
+
+    HoodieTimer timer = new HoodieTimer().startTimer();
+    Set<String> partitionIDFileIDSortedStrings = new TreeSet<>();
+    Map<String, Pair<String, String>> fileToKeyMap = new HashMap<>();
+    partitionNameFileNameList.forEach(partitionNameFileNamePair -> {
+          final String bloomKey = new 
PartitionIndexID(partitionNameFileNamePair.getLeft()).asBase64EncodedString()
+              .concat(new 
FileIndexID(partitionNameFileNamePair.getRight()).asBase64EncodedString());
+          partitionIDFileIDSortedStrings.add(bloomKey);
+          fileToKeyMap.put(bloomKey, partitionNameFileNamePair);
+        }
+    );
+
+    List<String> partitionIDFileIDStrings = new 
ArrayList<>(partitionIDFileIDSortedStrings);
+    List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> 
hoodieRecordList =
+        getRecordsByKeys(partitionIDFileIDStrings, 
MetadataPartitionType.BLOOM_FILTERS.getPartitionPath());
+    metrics.ifPresent(m -> 
m.updateMetrics(HoodieMetadataMetrics.LOOKUP_BLOOM_FILTERS_METADATA_STR, 
timer.endTimer()));
+
+    Map<Pair<String, String>, ByteBuffer> partitionFileToBloomFilterMap = new 
HashMap<>();
+    for (final Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>> entry 
: hoodieRecordList) {
+      if (entry.getRight().isPresent()) {
+        final Option<HoodieMetadataBloomFilter> bloomFilterMetadata =
+            entry.getRight().get().getData().getBloomFilterMetadata();
+        if (bloomFilterMetadata.isPresent()) {
+          if (!bloomFilterMetadata.get().getIsDeleted()) {
+            
ValidationUtils.checkState(fileToKeyMap.containsKey(entry.getLeft()));
+            
partitionFileToBloomFilterMap.put(fileToKeyMap.get(entry.getLeft()), 
bloomFilterMetadata.get().getBloomFilter());

Review comment:
       Shoudn't we return the actual bloom filter implementation here instead 
of the ByteArray?
   
   I see that in the schema you also save "Bloom filter type code" which means 
that different types of bloom filters are possible. So how will the caller know 
which bloom filter type to instantiate using the byte array?

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileReader.java
##########
@@ -133,21 +138,43 @@ public BloomFilter readBloomFilter() {
 
   @Override
   public Set<String> filterRowKeys(Set candidateRowKeys) {
-    // Current implementation reads all records and filters them. In certain 
cases, it many be better to:
-    //  1. Scan a limited subset of keys (min/max range of candidateRowKeys)
-    //  2. Lookup keys individually (if the size of candidateRowKeys is much 
less than the total keys in file)
     try {
-      List<Pair<String, R>> allRecords = readAllRecords();
-      Set<String> rowKeys = new HashSet<>();
-      allRecords.forEach(t -> {
-        if (candidateRowKeys.contains(t.getFirst())) {
-          rowKeys.add(t.getFirst());
-        }
-      });
-      return rowKeys;
+      return filterKeys(new TreeSet<>(candidateRowKeys));
     } catch (IOException e) {
-      throw new HoodieIOException("Failed to read row keys from " + path, e);
+      LOG.error("Failed to fetch keys from " + path);
+    }
+    return Collections.emptySet();
+  }
+
+  @Override
+  public Set<String> filterKeys(TreeSet<String> sortedCandidateRowKeys) throws 
IOException {
+    return filterRecordsImpl(sortedCandidateRowKeys).keySet();

Review comment:
       There is a faster way to simply filter keys - when scanning to a key, if 
the key does not exist then the scan fails. So there is no need to read the 
entire records.

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
##########
@@ -133,30 +144,89 @@ public HoodieBloomIndex(HoodieWriteConfig config, 
BaseHoodieBloomIndexHelper blo
   /**
    * Load all involved files as <Partition, filename> pair List.
    */
-  List<Pair<String, BloomIndexFileInfo>> loadInvolvedFiles(
+  List<Pair<String, BloomIndexFileInfo>> loadColumnRangesFromFiles(
       List<String> partitions, final HoodieEngineContext context, final 
HoodieTable hoodieTable) {
     // Obtain the latest data files from all the partitions.
     List<Pair<String, String>> partitionPathFileIDList = 
getLatestBaseFilesForAllPartitions(partitions, context, hoodieTable).stream()
         .map(pair -> Pair.of(pair.getKey(), pair.getValue().getFileId()))
         .collect(toList());
 
-    if (config.getBloomIndexPruneByRanges()) {
-      // also obtain file ranges, if range pruning is enabled
-      context.setJobStatus(this.getClass().getName(), "Obtain key ranges for 
file slices (range pruning=on)");
-      return context.map(partitionPathFileIDList, pf -> {
-        try {
-          HoodieRangeInfoHandle rangeInfoHandle = new 
HoodieRangeInfoHandle(config, hoodieTable, pf);
-          String[] minMaxKeys = rangeInfoHandle.getMinMaxKeys();
-          return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue(), 
minMaxKeys[0], minMaxKeys[1]));
-        } catch (MetadataNotFoundException me) {
-          LOG.warn("Unable to find range metadata in file :" + pf);
-          return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue()));
+    context.setJobStatus(this.getClass().getName(), "Obtain key ranges for 
file slices (range pruning=on)");
+    return context.map(partitionPathFileIDList, pf -> {
+      try {
+        HoodieRangeInfoHandle rangeInfoHandle = new 
HoodieRangeInfoHandle(config, hoodieTable, pf);
+        String[] minMaxKeys = rangeInfoHandle.getMinMaxKeys();
+        return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue(), 
minMaxKeys[0], minMaxKeys[1]));
+      } catch (MetadataNotFoundException me) {
+        LOG.warn("Unable to find range metadata in file :" + pf);
+        return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue()));
+      }
+    }, Math.max(partitionPathFileIDList.size(), 1));
+  }
+
+  /**
+   * Get BloomIndexFileInfo for all the latest base files for the requested 
partitions.
+   *
+   * @param partitions  - List of partitions to get the base files for
+   * @param context     - Engine context
+   * @param hoodieTable - Hoodie Table
+   * @return List of partition and file column range info pairs
+   */
+  private List<Pair<String, BloomIndexFileInfo>> getFileInfoForLatestBaseFiles(
+      List<String> partitions, final HoodieEngineContext context, final 
HoodieTable hoodieTable) {
+    List<Pair<String, String>> partitionPathFileIDList = 
getLatestBaseFilesForAllPartitions(partitions, context,
+        hoodieTable).stream()
+        .map(pair -> Pair.of(pair.getKey(), pair.getValue().getFileId()))
+        .collect(toList());
+    return partitionPathFileIDList.stream()
+        .map(pf -> Pair.of(pf.getKey(), new 
BloomIndexFileInfo(pf.getValue()))).collect(toList());
+  }
+
+  /**
+   * Load the column stats index as BloomIndexFileInfo for all the involved 
files in the partition.
+   *
+   * @param partitions  - List of partitions for which column stats need to be 
loaded
+   * @param context     - Engine context
+   * @param hoodieTable - Hoodie table
+   * @return List of partition and file column range info pairs
+   */
+  protected List<Pair<String, BloomIndexFileInfo>> 
loadColumnRangesFromMetaIndex(
+      List<String> partitions, final HoodieEngineContext context, final 
HoodieTable hoodieTable) {
+    // also obtain file ranges, if range pruning is enabled
+    context.setJobStatus(this.getClass().getName(), "Load meta index key 
ranges for file slices");
+
+    final String keyField = 
hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp();
+    return context.flatMap(partitions, partitionName -> {
+      List<String> partitionFileNameList = 
HoodieIndexUtils.getLatestBaseFilesForPartition(partitionName,

Review comment:
       is this also coming from the metadata table itself?
   
   

##########
File path: 
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
##########
@@ -150,18 +152,30 @@ protected void commit(HoodieData<HoodieRecord> 
hoodieDataRecords, String partiti
 
   /**
    * Tag each record with the location in the given partition.
-   *
+   * <p>
    * The record is tagged with respective file slice's location based on its 
record key.
    */
-  private List<HoodieRecord> prepRecords(List<HoodieRecord> records, String 
partitionName, int numFileGroups) {
-    List<FileSlice> fileSlices = 
HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient, 
partitionName);
-    ValidationUtils.checkArgument(fileSlices.size() == numFileGroups, 
String.format("Invalid number of file groups: found=%d, required=%d", 
fileSlices.size(), numFileGroups));
-
-    return records.stream().map(r -> {
-      FileSlice slice = 
fileSlices.get(HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(r.getRecordKey(),
 numFileGroups));
-      final String instantTime = slice.isEmpty() ? "I" : "U";
-      r.setCurrentLocation(new HoodieRecordLocation(instantTime, 
slice.getFileId()));
-      return r;
-    }).collect(Collectors.toList());
+  private List<HoodieRecord> prepRecords(Map<MetadataPartitionType, 
HoodieData<HoodieRecord>> partitionRecordsMap) {

Review comment:
       Can we move this function to the base class as it does not seem to have 
any flink specific handling? 
   
   HoodieData allows iterating over all records for tagging location.

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -460,7 +478,7 @@ private boolean bootstrapFromFilesystem(HoodieEngineContext 
engineContext, Hoodi
         .initTable(hadoopConf.get(), metadataWriteConfig.getBasePath());
 
     initTableMetadata();
-    initializeFileGroups(dataMetaClient, MetadataPartitionType.FILES, 
createInstantTime, 1);
+    initializeEnabledFileGroups(dataMetaClient, createInstantTime);

Review comment:
       Mght be better to do this when bootstrapping each individual type. For 
existing tables, it may be that the various partition types will be enabled one 
at a time. Either way, we will need to handle enabling after metadata table has 
already been created so having a single way to initialize may save duplication.

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
##########
@@ -1444,6 +1444,14 @@ public boolean useBloomIndexBucketizedChecking() {
     return getBoolean(HoodieIndexConfig.BLOOM_INDEX_BUCKETIZED_CHECKING);
   }
 
+  public boolean isMetadataIndexBloomFilterEnabled() {

Review comment:
       More descriptive if you rename to isMetadataBloomFilterIndexEnabled
   
   It is a BloomFilterIndex so moving the work Index to the end.

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieIOHandle.java
##########
@@ -31,8 +33,8 @@
   protected final FileSystem fs;
   protected final HoodieTable<T, I, K, O> hoodieTable;
 
-  HoodieIOHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, 
I, K, O> hoodieTable) {
-    this.instantTime = instantTime;
+  HoodieIOHandle(HoodieWriteConfig config, Option<String> instantTime, 
HoodieTable<T, I, K, O> hoodieTable) {
+    this.instantTime = instantTime.orElse(StringUtils.EMPTY_STRING);

Review comment:
       Does it have any side effects for the various handles which implement 
HoodieIOHandle? What does an empty instantTime mean for hoodie handles?

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -128,8 +130,21 @@
     this.dataWriteConfig = writeConfig;
     this.engineContext = engineContext;
     this.hadoopConf = new SerializableConfiguration(hadoopConf);
+    this.metrics = Option.empty();
+    this.enabledPartitionTypes = new ArrayList<>();
 
     if (writeConfig.isMetadataTableEnabled()) {
+      this.enabledPartitionTypes.add(MetadataPartitionType.FILES);
+      if (writeConfig.getMetadataConfig().isMetadataIndexBloomFilterEnabled()) 
{
+        MetadataPartitionType.BLOOM_FILTERS.setFileGroupCount(

Review comment:
       This has two problems that I see:
   1. What if once the user has created the file groups, the config setting is 
changed? 
   2. How does the user find the correct setting to be begin with?
   
   I suggest that:
   1. Once the fiel groups count has been used to initialize the file groups, 
we dont allow it to be changed. Hence, we get that value from the actual 
partitions in metadata table rather than from configs.
   2. This also frees the user from having to specify a value to initialize the 
file group. I see no easy to estimate the correct value when there are a large 
number of files in an existing dataset.

##########
File path: hudi-common/src/main/avro/HoodieMetadata.avsc
##########
@@ -30,27 +30,108 @@
             "doc": "Type of the metadata record",
             "type": "int"
         },
-        {   "name": "filesystemMetadata",
+        {
             "doc": "Contains information about partitions and files within the 
dataset",
-            "type": ["null", {
-               "type": "map",
-               "values": {
+            "name": "filesystemMetadata",
+            "type": [
+                "null",
+                {
+                    "type": "map",
+                    "values": {
+                        "type": "record",
+                        "name": "HoodieMetadataFileInfo",
+                        "fields": [
+                            {
+                                "name": "size",
+                                "type": "long",
+                                "doc": "Size of the file"
+                            },
+                            {
+                                "name": "isDeleted",
+                                "type": "boolean",
+                                "doc": "True if this file has been deleted"
+                            }
+                        ]
+                    }
+                }
+            ]
+        },
+        {
+            "doc": "Metadata Index of bloom filters for all data files in the 
user table",
+            "name": "BloomFilterMetadata",
+            "type": [
+                "null",
+                {
+                    "doc": "Data file bloom filter details",
+                    "name": "HoodieMetadataBloomFilter",
                     "type": "record",
-                    "name": "HoodieMetadataFileInfo",
                     "fields": [
                         {
-                            "name": "size",
-                            "type": "long",
-                            "doc": "Size of the file"
+                            "doc": "Bloom filter type code",
+                            "name": "type",
+                            "type": "string"

Review comment:
       can this be int? (doc says bloom filter type code so probably an int) 
else change the doc to bloom filter type.

##########
File path: hudi-common/src/main/avro/HoodieMetadata.avsc
##########
@@ -30,27 +30,108 @@
             "doc": "Type of the metadata record",
             "type": "int"
         },
-        {   "name": "filesystemMetadata",
+        {
             "doc": "Contains information about partitions and files within the 
dataset",
-            "type": ["null", {
-               "type": "map",
-               "values": {
+            "name": "filesystemMetadata",
+            "type": [
+                "null",
+                {
+                    "type": "map",
+                    "values": {
+                        "type": "record",
+                        "name": "HoodieMetadataFileInfo",
+                        "fields": [
+                            {
+                                "name": "size",
+                                "type": "long",
+                                "doc": "Size of the file"
+                            },
+                            {
+                                "name": "isDeleted",
+                                "type": "boolean",
+                                "doc": "True if this file has been deleted"
+                            }
+                        ]
+                    }
+                }
+            ]
+        },
+        {
+            "doc": "Metadata Index of bloom filters for all data files in the 
user table",
+            "name": "BloomFilterMetadata",
+            "type": [
+                "null",
+                {
+                    "doc": "Data file bloom filter details",
+                    "name": "HoodieMetadataBloomFilter",
                     "type": "record",
-                    "name": "HoodieMetadataFileInfo",
                     "fields": [
                         {
-                            "name": "size",
-                            "type": "long",
-                            "doc": "Size of the file"
+                            "doc": "Bloom filter type code",
+                            "name": "type",
+                            "type": "string"
+                        },
+                        {
+                            "doc": "Instant timestamp when this metadata was 
created/updated",
+                            "name": "timestamp",
+                            "type": "string"
+                        },
+                        {
+                            "doc": "Bloom filter binary byte array",
+                            "name": "bloomFilter",
+                            "type": "bytes"
+                        },
+                        {
+                            "doc": "Bloom filter entry valid/deleted flag",
+                            "name": "isDeleted",
+                            "type": "boolean"
+                        }
+                    ]
+                }
+            ]
+        },
+        {
+            "doc": "Metadata Index of column ranges for all data files in the 
user table",
+            "name": "ColumnStatsMetadata",
+            "type": [
+                "null",
+                {
+                    "doc": "Data file column ranges details",
+                    "name": "HoodieColumnStats",
+                    "type": "record",
+                    "fields": [
+                        {
+                            "doc": "Minimum value in the range. Based on user 
data table schema, we can convert this to appropriate type",
+                            "name": "minValue",
+                            "type": [
+                                "null",
+                                "string"
+                            ]
+                        },
+                        {
+                            "doc": "Maximum value in the range. Based on user 
data table schema, we can convert it to appropriate type",
+                            "name": "maxValue",
+                            "type": [
+                                "null",
+                                "string"
+                            ]
+                        },
+                        {
+                            "doc": "Maximum value in the range. Based on user 
data table schema, we can convert it to appropriate type",
+                            "name": "nullCount",

Review comment:
       How is nullCount useful for column stats filtering?

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
##########
@@ -171,17 +172,34 @@ protected void commit(HoodieData<HoodieRecord> 
hoodieDataRecords, String partiti
 
   /**
    * Tag each record with the location in the given partition.
-   *
+   * <p>
    * The record is tagged with respective file slice's location based on its 
record key.
    */
-  private JavaRDD<HoodieRecord> prepRecords(JavaRDD<HoodieRecord> recordsRDD, 
String partitionName, int numFileGroups) {
-    List<FileSlice> fileSlices = 
HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient, 
partitionName);
-    ValidationUtils.checkArgument(fileSlices.size() == numFileGroups, 
String.format("Invalid number of file groups: found=%d, required=%d", 
fileSlices.size(), numFileGroups));
-
-    return recordsRDD.map(r -> {
-      FileSlice slice = 
fileSlices.get(HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(r.getRecordKey(),
 numFileGroups));
-      r.setCurrentLocation(new 
HoodieRecordLocation(slice.getBaseInstantTime(), slice.getFileId()));
-      return r;
-    });
+  private JavaRDD<HoodieRecord> prepRecords(Map<MetadataPartitionType, 
HoodieData<HoodieRecord>> partitionRecordsMap) {
+    // The result set
+    JavaRDD<HoodieRecord> rddAllPartitionRecords = ((HoodieSparkEngineContext) 
engineContext).getJavaSparkContext().emptyRDD();
+
+    for (Map.Entry<MetadataPartitionType, HoodieData<HoodieRecord>> entry : 
partitionRecordsMap.entrySet()) {
+      final String partitionName = entry.getKey().getPartitionPath();
+      final int fileGroupCount = entry.getKey().getFileGroupCount();

Review comment:
       I feel this should not be required to be specified simply because it is 
hard to correctly estimate it across a wide range of datasets. 
   
   We already have the invariant that this value should always match the file 
group count found via getPartitionLatestFileSlices().

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
##########
@@ -63,7 +72,9 @@
   // Directory used for Spillable Map when merging records
   protected final String spillableMapDirectory;
 
-  protected boolean enabled;
+  protected boolean isMetadataTableEnabled;
+  protected boolean isMetaIndexBloomFilterEnabled = false;

Review comment:
       1. This will be hard to keep updating when more and more indexes are 
added. 
   2. I didnt see where these are set to true.
   3. Just enabling them is not enough as the bootstrap should have also taken 
place.

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
##########
@@ -157,6 +197,26 @@ public boolean enabled() {
     return getBoolean(ENABLE);
   }
 
+  public boolean isMetadataIndexBloomFilterEnabled() {
+    return getBooleanOrDefault(ENABLE_METADATA_INDEX_BLOOM_FILTER);
+  }
+
+  public boolean isMetadataIndexColumnStatsEnabled() {
+    return getBooleanOrDefault(ENABLE_METADATA_INDEX_COLUMN_STATS);
+  }
+
+  public boolean isMetadataIndexColumnStatsForAllColumns() {

Review comment:
       suffix of Enabled

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
##########
@@ -199,6 +259,16 @@ public Builder enable(boolean enable) {
       return this;
     }
 
+    public Builder withMetadataIndexBloomFilter(boolean enable) {

Review comment:
       Since this is MetadataConfig, it is implied that all of these are 
metadata related. So you may drop the word "Metadata" from these function names 
to shorten them.

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
##########
@@ -157,6 +197,26 @@ public boolean enabled() {
     return getBoolean(ENABLE);
   }
 
+  public boolean isMetadataIndexBloomFilterEnabled() {

Review comment:
       Since this is MetadataConfig, it is implied that all of these are 
metadata related. So you may drop the word "Metadata" from these function names 
to shorten them.

##########
File path: hudi-common/src/main/avro/HoodieMetadata.avsc
##########
@@ -30,27 +30,108 @@
             "doc": "Type of the metadata record",
             "type": "int"
         },
-        {   "name": "filesystemMetadata",
+        {
             "doc": "Contains information about partitions and files within the 
dataset",
-            "type": ["null", {
-               "type": "map",
-               "values": {
+            "name": "filesystemMetadata",
+            "type": [
+                "null",
+                {
+                    "type": "map",
+                    "values": {
+                        "type": "record",
+                        "name": "HoodieMetadataFileInfo",
+                        "fields": [
+                            {
+                                "name": "size",
+                                "type": "long",
+                                "doc": "Size of the file"
+                            },
+                            {
+                                "name": "isDeleted",
+                                "type": "boolean",
+                                "doc": "True if this file has been deleted"
+                            }
+                        ]
+                    }
+                }
+            ]
+        },
+        {
+            "doc": "Metadata Index of bloom filters for all data files in the 
user table",
+            "name": "BloomFilterMetadata",
+            "type": [
+                "null",
+                {
+                    "doc": "Data file bloom filter details",
+                    "name": "HoodieMetadataBloomFilter",
                     "type": "record",
-                    "name": "HoodieMetadataFileInfo",
                     "fields": [
                         {
-                            "name": "size",
-                            "type": "long",
-                            "doc": "Size of the file"
+                            "doc": "Bloom filter type code",
+                            "name": "type",
+                            "type": "string"
+                        },
+                        {
+                            "doc": "Instant timestamp when this metadata was 
created/updated",
+                            "name": "timestamp",

Review comment:
       Schema evolution allows you to add such field in the future too.

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
##########
@@ -146,12 +156,119 @@ protected BaseTableMetadata(HoodieEngineContext 
engineContext, HoodieMetadataCon
         .getAllFilesInPartitions(partitions);
   }
 
+  @Override
+  public Option<ByteBuffer> getBloomFilter(final String partitionName, final 
String fileName)
+      throws HoodieMetadataException {
+    if (!isMetaIndexBloomFilterEnabled) {
+      LOG.error("Meta index for bloom filters is disabled!");
+      return Option.empty();
+    }
+
+    final Pair<String, String> partitionFileName = Pair.of(partitionName, 
fileName);
+    Map<Pair<String, String>, ByteBuffer> bloomFilters = 
getBloomFilters(Collections.singletonList(partitionFileName));
+    if (bloomFilters.isEmpty()) {
+      LOG.error("Meta index: missing bloom filter for partition: " + 
partitionName + ", file: " + fileName);
+      return Option.empty();
+    }
+
+    ValidationUtils.checkState(bloomFilters.containsKey(partitionFileName));
+    return Option.of(bloomFilters.get(partitionFileName));
+  }
+
+  @Override
+  public Map<Pair<String, String>, ByteBuffer> getBloomFilters(final 
List<Pair<String, String>> partitionNameFileNameList)
+      throws HoodieMetadataException {
+    if (!isMetaIndexBloomFilterEnabled) {
+      LOG.error("Meta index for bloom filter is disabled!");
+      return Collections.emptyMap();
+    }
+
+    HoodieTimer timer = new HoodieTimer().startTimer();
+    Set<String> partitionIDFileIDSortedStrings = new TreeSet<>();
+    Map<String, Pair<String, String>> fileToKeyMap = new HashMap<>();
+    partitionNameFileNameList.forEach(partitionNameFileNamePair -> {
+          final String bloomKey = new 
PartitionIndexID(partitionNameFileNamePair.getLeft()).asBase64EncodedString()
+              .concat(new 
FileIndexID(partitionNameFileNamePair.getRight()).asBase64EncodedString());
+          partitionIDFileIDSortedStrings.add(bloomKey);
+          fileToKeyMap.put(bloomKey, partitionNameFileNamePair);
+        }
+    );
+
+    List<String> partitionIDFileIDStrings = new 
ArrayList<>(partitionIDFileIDSortedStrings);
+    List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> 
hoodieRecordList =
+        getRecordsByKeys(partitionIDFileIDStrings, 
MetadataPartitionType.BLOOM_FILTERS.getPartitionPath());
+    metrics.ifPresent(m -> 
m.updateMetrics(HoodieMetadataMetrics.LOOKUP_BLOOM_FILTERS_METADATA_STR, 
timer.endTimer()));

Review comment:
       This is the total time to lookup N bloom filters. We should probably be 
saving the "Average" time here so that this value is useful for comparison 
against runs.

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReader.java
##########
@@ -35,6 +37,14 @@
 
   public Set<String> filterRowKeys(Set<String> candidateRowKeys);
 
+  default Map<String, R> getRecordsByKeys(TreeSet<String> 
sortedCandidateRowKeys) throws IOException {

Review comment:
       Sorting keys is only useful for HFileReader. Since this is an interface, 
it should only impose the common functionality. I feel List<String> is more 
suited here and HFileReader should internally sort the keys.
   
   Additionally, wherever these keys are being read from will most probably be 
returning a List<String>. So there is already a step somewhere to create a 
TreeSet out of a List<> which is an additional copy. So having HFileReader sort 
internally is not worse in any way and seems more cleaner to me.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to