manojpec commented on a change in pull request #4352:
URL: https://github.com/apache/hudi/pull/4352#discussion_r789087943



##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomMetaIndexBatchCheckFunction.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bloom;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.bloom.BloomFilterTypeCode;
+import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.index.HoodieIndexUtils;
+import org.apache.hudi.io.HoodieKeyLookupResult;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.function.Function2;
+import scala.Tuple2;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Spark Function2 implementation for checking bloom filters for the
+ * requested keys from the metadata table index. The bloom filter
+ * checking for keys and the actual file verification for the
+ * candidate keys is done in a batch fashion for all the provided
+ * keys at once.
+ */
+public class HoodieBloomMetaIndexBatchCheckFunction implements
+    Function2<Integer, Iterator<Tuple2<String, HoodieKey>>, 
Iterator<List<HoodieKeyLookupResult>>> {
+
+  private static final Logger LOG = 
LogManager.getLogger(HoodieBloomMetaIndexBatchCheckFunction.class);
+  private final HoodieTable hoodieTable;
+  private final HoodieWriteConfig config;
+
+  public HoodieBloomMetaIndexBatchCheckFunction(HoodieTable hoodieTable, 
HoodieWriteConfig config) {
+    this.hoodieTable = hoodieTable;
+    this.config = config;
+  }
+
+  @Override
+  public Iterator<List<HoodieKeyLookupResult>> call(Integer integer, 
Iterator<Tuple2<String, HoodieKey>> tuple2Iterator) throws Exception {
+    List<List<HoodieKeyLookupResult>> resultList = new ArrayList<>();
+    // Partition path and file name pair to list of keys
+    Map<Pair<String, String>, List<HoodieKey>> fileToKeysMap = new HashMap<>();
+
+    final Map<String, HoodieBaseFile> fileIDBaseFileMap = new HashMap<>();
+    while (tuple2Iterator.hasNext()) {
+      Tuple2<String, HoodieKey> entry = tuple2Iterator.next();
+      final String partitionPath = entry._2.getPartitionPath();
+      final String fileId = entry._1;
+      if (!fileIDBaseFileMap.containsKey(fileId)) {
+        Option<HoodieBaseFile> baseFile = 
hoodieTable.getBaseFileOnlyView().getLatestBaseFile(partitionPath, fileId);

Review comment:
       > Is it possible for iterator to have a fileId multiple times in the 
same task? 
   The caller passes in list of tuples of file id/name to key. So, when many 
keys fall in the same file, we can see the same fileid repeating in the input 
list. Here we are constructing the base file for the file id only once. 
   
   > Is it possible at any point that there are no base files in the table? 
What happens then?
   The bloom filter and column range info are built from base files footer 
details. When there are no base files, we don't have index for them.  
   
   > Is it possible at any point that there are no base files in the table? 
What happens then? 
   Generally, even the MOR user table, starts off with a base file. With no 
indexes or index lookup miss, upserts will choose the insert code path and 
there by forcing the base file creation. But, I need to explore more on the 
kafka-connect case creating log files only. This is an open item.
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to