manojpec commented on a change in pull request #4352:
URL: https://github.com/apache/hudi/pull/4352#discussion_r786387988



##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
##########
@@ -146,6 +156,111 @@ protected BaseTableMetadata(HoodieEngineContext 
engineContext, HoodieMetadataCon
         .getAllFilesInPartitions(partitions);
   }
 
+  @Override
+  public Option<ByteBuffer> getBloomFilter(final String partitionName, final 
String fileName)
+      throws HoodieMetadataException {
+    if (!isMetaIndexBloomFilterEnabled) {
+      LOG.error("Meta index for bloom filters is disabled!");
+      return Option.empty();
+    }
+
+    final Pair<String, String> partitionFileName = Pair.of(partitionName, 
fileName);
+    Map<Pair<String, String>, ByteBuffer> bloomFilters = 
getBloomFilters(Collections.singletonList(partitionFileName));
+    if (bloomFilters.isEmpty()) {
+      LOG.error("Meta index: missing bloom filter for partition: " + 
partitionName + ", file: " + fileName);
+      return Option.empty();
+    }
+
+    ValidationUtils.checkState(bloomFilters.containsKey(partitionFileName));
+    return Option.of(bloomFilters.get(partitionFileName));
+  }
+
+  @Override
+  public Map<Pair<String, String>, ByteBuffer> getBloomFilters(final 
List<Pair<String, String>> partitionNameFileNameList)
+      throws HoodieMetadataException {
+    if (!isMetaIndexBloomFilterEnabled) {
+      LOG.error("Meta index for bloom filter is disabled!");
+      return Collections.emptyMap();
+    }
+
+    HoodieTimer timer = new HoodieTimer().startTimer();
+    Set<String> partitionIDFileIDSortedStrings = new TreeSet<>();
+    Map<String, Pair<String, String>> fileToKeyMap = new HashMap<>();
+    partitionNameFileNameList.forEach(partitionNameFileNamePair -> {
+          final String bloomKey = new 
PartitionIndexID(partitionNameFileNamePair.getLeft()).asBase64EncodedString()
+              .concat(new 
FileIndexID(partitionNameFileNamePair.getRight()).asBase64EncodedString());
+          partitionIDFileIDSortedStrings.add(bloomKey);
+          fileToKeyMap.put(bloomKey, partitionNameFileNamePair);
+        }
+    );
+
+    List<String> partitionIDFileIDStrings = new 
ArrayList<>(partitionIDFileIDSortedStrings);
+    List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> 
hoodieRecordList =
+        getRecordsByKeys(partitionIDFileIDStrings, 
MetadataPartitionType.BLOOM_FILTERS.partitionPath());
+    metrics.ifPresent(m -> 
m.updateMetrics(HoodieMetadataMetrics.LOOKUP_BLOOM_FILTERS_METADATA_STR, 
timer.endTimer()));
+
+    Map<Pair<String, String>, ByteBuffer> partitionFileToBloomFilterMap = new 
HashMap<>();
+    for (final Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>> entry 
: hoodieRecordList) {
+      if (entry.getRight().isPresent()) {
+        final Option<HoodieMetadataBloomFilter> bloomFilterMetadata =
+            entry.getRight().get().getData().getBloomFilterMetadata();
+        if (bloomFilterMetadata.isPresent()) {
+          if (!bloomFilterMetadata.get().getIsDeleted()) {
+            
ValidationUtils.checkState(fileToKeyMap.containsKey(entry.getLeft()));
+            
partitionFileToBloomFilterMap.put(fileToKeyMap.get(entry.getLeft()), 
bloomFilterMetadata.get().getBloomFilter());
+          }
+        } else {
+          LOG.error("Meta index bloom filter missing for: " + 
fileToKeyMap.get(entry.getLeft()));
+        }
+      }
+    }
+    return partitionFileToBloomFilterMap;
+  }
+
+  @Override
+  public Map<Pair<String, String>, HoodieColumnStats> getColumnStats(final 
List<Pair<String, String>> partitionNameFileNameList, final String columnName)
+      throws HoodieMetadataException {
+    if (!isMetaIndexColumnStatsEnabled) {
+      LOG.error("Meta index for column stats is disabled!");
+      return Collections.emptyMap();
+    }
+
+    Map<String, Pair<String, String>> columnStatKeyToFileNameMap = new 
HashMap<>();
+    List<String> columnStatKeys = new ArrayList<>();
+    final String columnIndexStr = new 
ColumnIndexID(columnName).asBase64EncodedString();
+    for (Pair<String, String> partitionNameFileNamePair : 
partitionNameFileNameList) {
+      final String columnStatIndexKey = columnIndexStr
+          .concat(new 
PartitionIndexID(partitionNameFileNamePair.getLeft()).asBase64EncodedString())
+          .concat(new 
FileIndexID(partitionNameFileNamePair.getRight()).asBase64EncodedString());
+      columnStatKeys.add(columnStatIndexKey);

Review comment:
       HoodieBloomIndex line 213 sort the keys before passing in here. But we 
don't want to assume on the sorted keys input here, so fixed it here as well.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to