alexeykudinkin commented on code in PR #6432:
URL: https://github.com/apache/hudi/pull/6432#discussion_r954187229


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/BloomIndexFileInfo.java:
##########
@@ -27,19 +27,20 @@
 public class BloomIndexFileInfo implements Serializable {
 
   private final String fileId;
-
+  private final String filename;
   private final String minRecordKey;
-
   private final String maxRecordKey;
 
-  public BloomIndexFileInfo(String fileId, String minRecordKey, String 
maxRecordKey) {
+  public BloomIndexFileInfo(String fileId, String filename, String 
minRecordKey, String maxRecordKey) {
     this.fileId = fileId;
+    this.filename = filename;

Review Comment:
   nit: `fileName`



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/BloomIndexFileInfo.java:
##########
@@ -27,19 +27,20 @@
 public class BloomIndexFileInfo implements Serializable {
 
   private final String fileId;
-
+  private final String filename;

Review Comment:
   Do we really need to store both file-id and file-name? I think we can just 
store the file-name, and then convert it to file-id wherever necessary



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieMetadataBloomIndexCheckFunction.java:
##########
@@ -83,37 +89,64 @@ protected void start() {
     @Override
     protected List<HoodieKeyLookupResult> computeNext() {
       // Partition path and file name pair to list of keys
-      final Map<Pair<String, String>, List<HoodieKey>> fileToKeysMap = new 
HashMap<>();
-      final Map<String, HoodieBaseFile> fileIDBaseFileMap = new HashMap<>();
+      final Map<Pair<String, String>, List<HoodieKey>> batchFileToKeysMap = 
new HashMap<>();
       final List<HoodieKeyLookupResult> resultList = new ArrayList<>();
+      String lastFileId = null;
+
+      try {
+        // Here we batch process the lookup of bloom filters in metadata table
+        // assuming the partition path and file name pairs are already sorted 
by the corresponding key
+        while (inputItr.hasNext()) {
+          Tuple2<Tuple2<String, String>, HoodieKey> entry = inputItr.next();
+          final String partitionPath = entry._2.getPartitionPath();
+          final String fileId = entry._1._1();
+          final String filename = entry._1._2();
+
+          if (lastFileId == null || !lastFileId.equals(fileId)) {
+            if (processedFileIdSet.contains(fileId)) {
+              LOG.warn(String.format("Fetching the bloom filter for file ID %s 
again.  "
+                  + " The input pairs of file ID and record key are not 
sorted.", fileId));
+            }
+            lastFileId = fileId;
+            processedFileIdSet.add(fileId);
+          }
+
+          batchFileToKeysMap.computeIfAbsent(Pair.of(partitionPath, filename), 
k -> new ArrayList<>()).add(entry._2);
 
-      while (inputItr.hasNext()) {
-        Tuple2<String, HoodieKey> entry = inputItr.next();
-        final String partitionPath = entry._2.getPartitionPath();
-        final String fileId = entry._1;
-        if (!fileIDBaseFileMap.containsKey(fileId)) {
-          Option<HoodieBaseFile> baseFile = 
hoodieTable.getBaseFileOnlyView().getLatestBaseFile(partitionPath, fileId);
-          if (!baseFile.isPresent()) {
-            throw new HoodieIndexException("Failed to find the base file for 
partition: " + partitionPath
-                + ", fileId: " + fileId);
+          if (batchFileToKeysMap.size() == batchSize) {
+            resultList.addAll(lookupKeysInBloomFilters(batchFileToKeysMap));
+            batchFileToKeysMap.clear();
           }
-          fileIDBaseFileMap.put(fileId, baseFile.get());
         }
-        fileToKeysMap.computeIfAbsent(Pair.of(partitionPath, 
fileIDBaseFileMap.get(fileId).getFileName()),
-            k -> new ArrayList<>()).add(entry._2);
-        if (fileToKeysMap.size() > 
BLOOM_FILTER_CHECK_MAX_FILE_COUNT_PER_BATCH) {
-          break;
+
+        if (batchFileToKeysMap.size() > 0) {
+          resultList.addAll(lookupKeysInBloomFilters(batchFileToKeysMap));
+          batchFileToKeysMap.clear();
         }
+
+        return resultList;
+      } catch (Throwable e) {
+        if (e instanceof HoodieException) {
+          throw e;
+        }
+        throw new HoodieIndexException("Error checking bloom filter using 
metadata table.", e);
       }
-      if (fileToKeysMap.isEmpty()) {
-        return Collections.emptyList();
-      }
+    }
+
+    @Override
+    protected void end() {
+    }
 
-      List<Pair<String, String>> partitionNameFileNameList = new 
ArrayList<>(fileToKeysMap.keySet());
+    private List<HoodieKeyLookupResult> lookupKeysInBloomFilters(
+        Map<Pair<String, String>, List<HoodieKey>> fileToKeysMap) {
+      List<HoodieKeyLookupResult> resultList = new ArrayList<>();
+      List<Pair<String, String>> partitionPathFileNameList = new 
ArrayList<>(fileToKeysMap.keySet());
+      HoodieTimer timer = HoodieTimer.start();
       Map<Pair<String, String>, BloomFilter> fileToBloomFilterMap =
-          
hoodieTable.getMetadataTable().getBloomFilters(partitionNameFileNameList);
+          
hoodieTable.getMetadataTable().getBloomFilters(partitionPathFileNameList);
+      LOG.error(String.format("Took %d ms to look up %s bloom filters",

Review Comment:
   This should rather be debug/info



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieMetadataBloomIndexCheckFunction.java:
##########
@@ -53,26 +55,30 @@
  * keys are checked against them.
  */
 public class HoodieMetadataBloomIndexCheckFunction implements
-    Function2<Integer, Iterator<Tuple2<String, HoodieKey>>, 
Iterator<List<HoodieKeyLookupResult>>> {
+    Function2<Integer, Iterator<Tuple2<Tuple2<String, String>, HoodieKey>>, 
Iterator<List<HoodieKeyLookupResult>>> {

Review Comment:
   In general, it's better to keep the nested hierarchies as flat as possible 
(makes it easier to comprehend, reduces amount of objects needed as well): in 
that case we could use just the `Tuple3` in lieu of 2 `Tuple2`s



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java:
##########
@@ -121,12 +121,14 @@ private HoodiePairData<HoodieKey, HoodieRecordLocation> 
lookupIndex(
 
     // Step 2: Load all involved files as <Partition, filename> pairs
     List<Pair<String, BloomIndexFileInfo>> fileInfoList = 
getBloomIndexFileInfoForPartitions(context, hoodieTable, 
affectedPartitionPathList);
-    final Map<String, List<BloomIndexFileInfo>> partitionToFileInfo =
-        fileInfoList.stream().collect(groupingBy(Pair::getLeft, 
mapping(Pair::getRight, toList())));
+    // partition -> {file ID -> BloomIndexFileInfo instance}
+    final Map<String, Map<String, BloomIndexFileInfo>> partitionToFileInfo = 
fileInfoList.stream()
+        .collect(groupingBy(Pair::getLeft, toMap(entry -> 
entry.getRight().getFileId(), Pair::getRight)));
 
     // Step 3: Obtain a HoodieData, for each incoming record, that already 
exists, with the file id,
     // that contains it.
-    HoodieData<Pair<String, HoodieKey>> fileComparisonPairs =
+    // Each entry: ((File ID, Filename), HoodieKey instance)
+    HoodieData<Pair<Pair<String, String>, HoodieKey>> fileComparisonPairs =

Review Comment:
   Same applies here: i don't think we need to propagate both file-id and 
file-name since the latter contains the former (it increases both complexity 
and amount of data we need to shuffle)



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieMetadataBloomIndexCheckFunction.java:
##########
@@ -83,37 +89,64 @@ protected void start() {
     @Override
     protected List<HoodieKeyLookupResult> computeNext() {
       // Partition path and file name pair to list of keys
-      final Map<Pair<String, String>, List<HoodieKey>> fileToKeysMap = new 
HashMap<>();
-      final Map<String, HoodieBaseFile> fileIDBaseFileMap = new HashMap<>();
+      final Map<Pair<String, String>, List<HoodieKey>> batchFileToKeysMap = 
new HashMap<>();
       final List<HoodieKeyLookupResult> resultList = new ArrayList<>();
+      String lastFileId = null;
+
+      try {
+        // Here we batch process the lookup of bloom filters in metadata table
+        // assuming the partition path and file name pairs are already sorted 
by the corresponding key
+        while (inputItr.hasNext()) {
+          Tuple2<Tuple2<String, String>, HoodieKey> entry = inputItr.next();
+          final String partitionPath = entry._2.getPartitionPath();
+          final String fileId = entry._1._1();
+          final String filename = entry._1._2();
+
+          if (lastFileId == null || !lastFileId.equals(fileId)) {
+            if (processedFileIdSet.contains(fileId)) {
+              LOG.warn(String.format("Fetching the bloom filter for file ID %s 
again.  "
+                  + " The input pairs of file ID and record key are not 
sorted.", fileId));
+            }
+            lastFileId = fileId;
+            processedFileIdSet.add(fileId);
+          }
+
+          batchFileToKeysMap.computeIfAbsent(Pair.of(partitionPath, filename), 
k -> new ArrayList<>()).add(entry._2);
 
-      while (inputItr.hasNext()) {
-        Tuple2<String, HoodieKey> entry = inputItr.next();
-        final String partitionPath = entry._2.getPartitionPath();
-        final String fileId = entry._1;
-        if (!fileIDBaseFileMap.containsKey(fileId)) {
-          Option<HoodieBaseFile> baseFile = 
hoodieTable.getBaseFileOnlyView().getLatestBaseFile(partitionPath, fileId);
-          if (!baseFile.isPresent()) {
-            throw new HoodieIndexException("Failed to find the base file for 
partition: " + partitionPath
-                + ", fileId: " + fileId);
+          if (batchFileToKeysMap.size() == batchSize) {
+            resultList.addAll(lookupKeysInBloomFilters(batchFileToKeysMap));

Review Comment:
   Let's just break in this conditional to avoid duplication



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java:
##########
@@ -121,12 +121,14 @@ private HoodiePairData<HoodieKey, HoodieRecordLocation> 
lookupIndex(
 
     // Step 2: Load all involved files as <Partition, filename> pairs
     List<Pair<String, BloomIndexFileInfo>> fileInfoList = 
getBloomIndexFileInfoForPartitions(context, hoodieTable, 
affectedPartitionPathList);
-    final Map<String, List<BloomIndexFileInfo>> partitionToFileInfo =
-        fileInfoList.stream().collect(groupingBy(Pair::getLeft, 
mapping(Pair::getRight, toList())));
+    // partition -> {file ID -> BloomIndexFileInfo instance}

Review Comment:
   nit: "File Id" denomination might be confusing, more accurate would be 
"file-group id"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to