alexeykudinkin commented on code in PR #7642:
URL: https://github.com/apache/hudi/pull/7642#discussion_r1089551108


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java:
##########
@@ -115,27 +181,124 @@ public HoodiePairData<HoodieKey, HoodieRecordLocation> 
findMatchingFilesForRecor
   /**
    * Compute the estimated number of bloom filter comparisons to be performed 
on each file group.
    */
-  private Map<String, Long> computeComparisonsPerFileGroup(
+  private Map<HoodieFileGroupId, Long> computeComparisonsPerFileGroup(
       final HoodieWriteConfig config,
       final Map<String, Long> recordsPerPartition,
       final Map<String, List<BloomIndexFileInfo>> partitionToFileInfo,
-      final JavaRDD<Tuple2<String, HoodieKey>> fileComparisonsRDD,
+      final JavaPairRDD<HoodieFileGroupId, String> fileComparisonsRDD,
       final HoodieEngineContext context) {
-    Map<String, Long> fileToComparisons;
+    Map<HoodieFileGroupId, Long> fileToComparisons;
     if (config.getBloomIndexPruneByRanges()) {
       // we will just try exploding the input and then count to determine 
comparisons
       // FIX(vc): Only do sampling here and extrapolate?
       context.setJobStatus(this.getClass().getSimpleName(), "Compute all 
comparisons needed between records and files: " + config.getTableName());
-      fileToComparisons = fileComparisonsRDD.mapToPair(t -> t).countByKey();
+      fileToComparisons = fileComparisonsRDD.countByKey();
     } else {
       fileToComparisons = new HashMap<>();
-      partitionToFileInfo.forEach((key, value) -> {
-        for (BloomIndexFileInfo fileInfo : value) {
+      partitionToFileInfo.forEach((partitionPath, fileInfos) -> {
+        for (BloomIndexFileInfo fileInfo : fileInfos) {
           // each file needs to be compared against all the records coming 
into the partition
-          fileToComparisons.put(fileInfo.getFileId(), 
recordsPerPartition.get(key));
+          fileToComparisons.put(
+              new HoodieFileGroupId(partitionPath, fileInfo.getFileId()), 
recordsPerPartition.get(partitionPath));
         }
       });
     }
     return fileToComparisons;
   }
+
+  private static HoodieTableFileSystemView getBaseFileOnlyView(HoodieTable<?, 
?, ?, ?> hoodieTable, Collection<String> partitionPaths) {
+    try {
+      List<String> fullPartitionPaths = partitionPaths.stream()
+          .map(partitionPath ->
+              String.format("%s/%s", 
hoodieTable.getMetaClient().getBasePathV2(), partitionPath))
+          .collect(Collectors.toList());
+
+      FileStatus[] allFiles =
+          
hoodieTable.getMetadataTable().getAllFilesInPartitions(fullPartitionPaths).values().stream()
+              .flatMap(Arrays::stream)
+              .toArray(FileStatus[]::new);
+
+      return new HoodieTableFileSystemView(hoodieTable.getMetaClient(), 
hoodieTable.getActiveTimeline(), allFiles);
+    } catch (IOException e) {
+      LOG.error(String.format("Failed to fetch all files for partitions (%s)", 
partitionPaths));
+      throw new HoodieIOException("Failed to fetch all files for partitions", 
e);
+    }
+  }
+
+  static class AffineBloomIndexFileGroupPartitioner extends Partitioner {
+
+    private final Broadcast<HoodieTableFileSystemView> 
latestBaseFilesBroadcast;
+
+    // TODO(HUDI-5619) remove when addressed
+    private final Map<String, Map<String, String>> cachedLatestBaseFileNames =
+        new HashMap<>(16);

Review Comment:
   We want to skip a first few expansions, but don't want to allocate too much 
memory that might not even be used



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to