yihua commented on code in PR #7642:
URL: https://github.com/apache/hudi/pull/7642#discussion_r1087131134


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java:
##########
@@ -210,34 +210,35 @@ protected List<Pair<String, BloomIndexFileInfo>> 
loadColumnRangesFromMetaIndex(
     // also obtain file ranges, if range pruning is enabled
     context.setJobStatus(this.getClass().getName(), "Load meta index key 
ranges for file slices: " + config.getTableName());
 
-    final String keyField = 
hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp();
-    return context.flatMap(partitions, partitionName -> {
-      // Partition and file name pairs
-      List<Pair<String, String>> partitionFileNameList = 
HoodieIndexUtils.getLatestBaseFilesForPartition(partitionName,
-              hoodieTable).stream().map(baseFile -> Pair.of(partitionName, 
baseFile.getFileName()))
-          .sorted()
-          .collect(toList());
-      if (partitionFileNameList.isEmpty()) {
-        return Stream.empty();
-      }
-      try {
-        Map<Pair<String, String>, HoodieMetadataColumnStats> 
fileToColumnStatsMap =
-            
hoodieTable.getMetadataTable().getColumnStats(partitionFileNameList, keyField);
-        List<Pair<String, BloomIndexFileInfo>> result = new ArrayList<>();
-        for (Map.Entry<Pair<String, String>, HoodieMetadataColumnStats> entry 
: fileToColumnStatsMap.entrySet()) {
-          result.add(Pair.of(entry.getKey().getLeft(),
-              new BloomIndexFileInfo(
-                  FSUtils.getFileId(entry.getKey().getRight()),
-                  // NOTE: Here we assume that the type of the primary key 
field is string
-                  (String) 
unwrapStatisticValueWrapper(entry.getValue().getMinValue()),
-                  (String) 
unwrapStatisticValueWrapper(entry.getValue().getMaxValue())
-              )));
-        }
-        return result.stream();
-      } catch (MetadataNotFoundException me) {
-        throw new HoodieMetadataException("Unable to find column range 
metadata for partition:" + partitionName, me);
-      }
-    }, Math.max(partitions.size(), 1));
+    String keyField = 
hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp();
+
+    // Partition and file name pairs
+    List<Pair<String, String>> partitionFileNameList =
+        HoodieIndexUtils.getLatestBaseFilesForAllPartitions(partitions, 
context, hoodieTable).stream()
+            .map(partitionBaseFilePair -> 
Pair.of(partitionBaseFilePair.getLeft(), 
partitionBaseFilePair.getRight().getFileName()))
+            .sorted()
+            .collect(toList());
+
+    if (partitionFileNameList.isEmpty()) {
+      return Collections.emptyList();
+    }
+
+    Map<Pair<String, String>, HoodieMetadataColumnStats> fileToColumnStatsMap =

Review Comment:
   One note here is that this is now storing column stats of the key field of 
all files in memory on the driver, which can be slightly larger than the 
returned list.  Previously the column stats of the key field of files in one 
partition are stored on the executor side.  Might not be an issue though. 



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/AbstractIterator.java:
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bloom;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+/**
+ * TODO upstream

Review Comment:
   nit: fix the docs



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieFileProbingFunction.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bloom;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.client.utils.LazyIterableIterator;
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.index.HoodieIndexUtils;
+import org.apache.hudi.io.HoodieKeyLookupResult;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.broadcast.Broadcast;
+import scala.Tuple2;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Implementation of the function probing filtered in candidate keys provided 
in
+ * {@link HoodieBloomFilterProbingResult} w/in corresponding files identified 
by {@link HoodieFileGroupId}
+ * to validate whether the record w/ the provided key is indeed persisted in it
+ */
+public class HoodieFileProbingFunction implements
+    FlatMapFunction<Iterator<Tuple2<HoodieFileGroupId, 
HoodieBloomFilterProbingResult>>, List<HoodieKeyLookupResult>> {
+
+  private static final Logger LOG = 
LogManager.getLogger(HoodieFileProbingFunction.class);
+
+  // Assuming each file bloom filter takes up 512K, sizing the max file count
+  // per batch so that the total fetched bloom filters would not cross 128 MB.
+  private static final long BLOOM_FILTER_CHECK_MAX_FILE_COUNT_PER_BATCH = 256;
+
+  private final Broadcast<HoodieTableFileSystemView> baseFileOnlyViewBroadcast;
+  private final SerializableConfiguration hadoopConf;
+
+  public HoodieFileProbingFunction(Broadcast<HoodieTableFileSystemView> 
baseFileOnlyViewBroadcast,
+                                   SerializableConfiguration hadoopConf) {
+    this.baseFileOnlyViewBroadcast = baseFileOnlyViewBroadcast;
+    this.hadoopConf = hadoopConf;
+  }
+
+  @Override
+  public Iterator<List<HoodieKeyLookupResult>> 
call(Iterator<Tuple2<HoodieFileGroupId, HoodieBloomFilterProbingResult>> 
tuple2Iterator) throws Exception {
+    return new BloomIndexLazyKeyCheckIterator(tuple2Iterator);
+  }
+
+  private class BloomIndexLazyKeyCheckIterator
+      extends LazyIterableIterator<Tuple2<HoodieFileGroupId, 
HoodieBloomFilterProbingResult>, List<HoodieKeyLookupResult>> {
+
+    public BloomIndexLazyKeyCheckIterator(Iterator<Tuple2<HoodieFileGroupId, 
HoodieBloomFilterProbingResult>> tuple2Iterator) {
+      super(tuple2Iterator);
+    }
+
+    @Override
+    protected List<HoodieKeyLookupResult> computeNext() {
+      // Partition path and file name pair to list of keys
+      final Map<Pair<String, String>, HoodieBloomFilterProbingResult> 
fileToLookupResults = new HashMap<>();
+      final Map<String, HoodieBaseFile> fileIDBaseFileMap = new HashMap<>();
+
+      while (inputItr.hasNext()) {
+        Tuple2<HoodieFileGroupId, HoodieBloomFilterProbingResult> entry = 
inputItr.next();
+        final String partitionPath = entry._1.getPartitionPath();
+        final String fileId = entry._1.getFileId();
+
+        if (!fileIDBaseFileMap.containsKey(fileId)) {
+          Option<HoodieBaseFile> baseFile =
+              
baseFileOnlyViewBroadcast.getValue().getLatestBaseFile(partitionPath, fileId);
+          if (!baseFile.isPresent()) {
+            throw new HoodieIndexException("Failed to find the base file for 
partition: " + partitionPath
+                + ", fileId: " + fileId);
+          }
+
+          fileIDBaseFileMap.put(fileId, baseFile.get());
+        }
+
+        fileToLookupResults.putIfAbsent(Pair.of(partitionPath, 
fileIDBaseFileMap.get(fileId).getFileName()), entry._2);
+
+        if (fileToLookupResults.size() > 
BLOOM_FILTER_CHECK_MAX_FILE_COUNT_PER_BATCH) {
+          break;

Review Comment:
   Should an exception be thrown here to indicate that not all files are 
checked?



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java:
##########
@@ -66,42 +80,94 @@ public static SparkHoodieBloomIndexHelper getInstance() {
   public HoodiePairData<HoodieKey, HoodieRecordLocation> 
findMatchingFilesForRecordKeys(
       HoodieWriteConfig config, HoodieEngineContext context, HoodieTable 
hoodieTable,
       HoodiePairData<String, String> partitionRecordKeyPairs,
-      HoodieData<Pair<String, HoodieKey>> fileComparisonPairs,
+      HoodiePairData<HoodieFileGroupId, String> fileComparisonPairs,
       Map<String, List<BloomIndexFileInfo>> partitionToFileInfo,
       Map<String, Long> recordsPerPartition) {
-    JavaRDD<Tuple2<String, HoodieKey>> fileComparisonsRDD =
-        HoodieJavaRDD.getJavaRDD(fileComparisonPairs)
-            .map(pair -> new Tuple2<>(pair.getLeft(), pair.getRight()));
 
     int inputParallelism = 
HoodieJavaPairRDD.getJavaPairRDD(partitionRecordKeyPairs).partitions().size();
-    int joinParallelism = Math.max(inputParallelism, 
config.getBloomIndexParallelism());
-    LOG.info("InputParallelism: ${" + inputParallelism + "}, IndexParallelism: 
${"
-        + config.getBloomIndexParallelism() + "}");
+    int configuredBloomIndexParallelism = config.getBloomIndexParallelism();
 
+    // NOTE: Target parallelism could be overridden by the config
+    int targetParallelism =

Review Comment:
   note to myself: if the configured bloom index parallelism is smaller than 
the input parallelism, before this PR, we take the input parallelism; now, we 
take the configured bloom index parallelism.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java:
##########
@@ -289,11 +290,13 @@ HoodieData<Pair<String, HoodieKey>> 
explodeRecordsWithFileComparisons(
       String recordKey = partitionRecordKeyPair.getRight();
       String partitionPath = partitionRecordKeyPair.getLeft();
 
-      return indexFileFilter.getMatchingFilesAndPartition(partitionPath, 
recordKey).stream()
-          .map(partitionFileIdPair -> (Pair<String, HoodieKey>) new 
ImmutablePair<>(partitionFileIdPair.getRight(),
-              new HoodieKey(recordKey, partitionPath)))
-          .collect(Collectors.toList());
-    }).flatMap(List::iterator);
+      return indexFileFilter.getMatchingFilesAndPartition(partitionPath, 
recordKey)

Review Comment:
   This affects the existing code path for Bloom Index without using the 
metadata table.  @alexeykudinkin have you compared the performance of Bloom 
Index vs master or 0.12.2 when the metadata table is not used for fetching 
bloom filters?



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieMetadataBloomFilterProbingFunction.java:
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bloom;
+
+import org.apache.hudi.client.utils.LazyIterableIterator;
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.metadata.HoodieBackedTableMetadata;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+import org.apache.spark.broadcast.Broadcast;
+import scala.Tuple2;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+/**
+ * Implementation of the function that probing Bloom Filters of individual 
files verifying
+ * whether particular record key could be stored in the latest file-slice of 
the file-group
+ * identified by the {@link HoodieFileGroupId}
+ */
+public class HoodieMetadataBloomFilterProbingFunction implements
+    PairFlatMapFunction<Iterator<Tuple2<HoodieFileGroupId, String>>, 
HoodieFileGroupId, HoodieBloomFilterProbingResult> {
+
+  private static final Logger LOG = 
LogManager.getLogger(HoodieMetadataBloomFilterProbingFunction.class);
+
+  // Assuming each file bloom filter takes up 512K, sizing the max file count
+  // per batch so that the total fetched bloom filters would not cross 128 MB.
+  private static final long BLOOM_FILTER_CHECK_MAX_FILE_COUNT_PER_BATCH = 256;
+  private final HoodieTable hoodieTable;
+
+  private final Broadcast<HoodieTableFileSystemView> baseFileOnlyViewBroadcast;
+
+  /**
+   * NOTE: It's critical for this ctor to accept {@link HoodieTable} to make 
sure that it uses
+   *       broadcast-ed instance of {@link HoodieBackedTableMetadata} 
internally, instead of
+   *       one being serialized and deserialized for _every_ task individually
+   *
+   * NOTE: We pass in broadcasted {@link HoodieTableFileSystemView} to make 
sure it's materialized
+   *       on executor once
+   */
+  public 
HoodieMetadataBloomFilterProbingFunction(Broadcast<HoodieTableFileSystemView> 
baseFileOnlyViewBroadcast,
+                                                  HoodieTable hoodieTable) {
+    this.baseFileOnlyViewBroadcast = baseFileOnlyViewBroadcast;
+    this.hoodieTable = hoodieTable;
+  }
+
+  @Override
+  public Iterator<Tuple2<HoodieFileGroupId, HoodieBloomFilterProbingResult>> 
call(Iterator<Tuple2<HoodieFileGroupId, String>> tuple2Iterator) throws 
Exception {
+    return new FlattenedIterator<>(new 
BloomIndexLazyKeyCheckIterator(tuple2Iterator), Function.identity());
+  }
+
+  private class BloomIndexLazyKeyCheckIterator
+      extends LazyIterableIterator<Tuple2<HoodieFileGroupId, String>, 
Iterator<Tuple2<HoodieFileGroupId, HoodieBloomFilterProbingResult>>> {
+
+    public BloomIndexLazyKeyCheckIterator(Iterator<Tuple2<HoodieFileGroupId, 
String>> tuple2Iterator) {
+      super(tuple2Iterator);
+    }
+
+    @Override
+    protected Iterator<Tuple2<HoodieFileGroupId, 
HoodieBloomFilterProbingResult>> computeNext() {
+      // Partition path and file name pair to list of keys
+      final Map<Pair<String, String>, List<HoodieKey>> fileToKeysMap = new 
HashMap<>();
+      final Map<String, HoodieBaseFile> fileIDBaseFileMap = new HashMap<>();
+
+      while (inputItr.hasNext()) {
+        Tuple2<HoodieFileGroupId, String> entry = inputItr.next();
+        String partitionPath = entry._1.getPartitionPath();
+        String fileId = entry._1.getFileId();
+
+        if (!fileIDBaseFileMap.containsKey(fileId)) {
+          Option<HoodieBaseFile> baseFile = 
baseFileOnlyViewBroadcast.getValue().getLatestBaseFile(partitionPath, fileId);
+          if (!baseFile.isPresent()) {
+            throw new HoodieIndexException("Failed to find the base file for 
partition: " + partitionPath
+                + ", fileId: " + fileId);
+          }
+          fileIDBaseFileMap.put(fileId, baseFile.get());
+        }
+
+        fileToKeysMap.computeIfAbsent(Pair.of(partitionPath, 
fileIDBaseFileMap.get(fileId).getFileName()),
+            k -> new ArrayList<>()).add(new HoodieKey(entry._2, 
partitionPath));
+
+        if (fileToKeysMap.size() > 
BLOOM_FILTER_CHECK_MAX_FILE_COUNT_PER_BATCH) {
+          break;

Review Comment:
   Similar here.  Should this throw an exception?



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/FlattenedIterator.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bloom;
+
+import java.util.Iterator;
+import java.util.function.Function;
+
+/**
+ * TODO revisit (simplify)

Review Comment:
   nit: docs



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java:
##########
@@ -119,7 +118,16 @@ public <O> HoodieData<O> 
mapPartitions(SerializableFunction<Iterator<T>, Iterato
 
   @Override
   public <O> HoodieData<O> flatMap(SerializableFunction<T, Iterator<O>> func) {
-    return HoodieJavaRDD.of(rddData.flatMap(e -> func.apply(e)));
+    // NOTE: Unrolling this lambda into a method reference results in 
[[ClassCastException]]
+    //       due to weird interop b/w Scala and Java
+    return HoodieJavaRDD.of(rddData.flatMap(x -> func.apply(x)));
+  }

Review Comment:
   I don't see any change in this part.  What is the intention here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to