the-other-tim-brown commented on code in PR #10578:
URL: https://github.com/apache/hudi/pull/10578#discussion_r1468911224


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -122,18 +121,14 @@ public static List<FileSlice> 
getLatestFileSlicesForPartition(
    * @param hoodieTable instance of {@link HoodieTable} of interest
    * @return the list of Pairs of partition path and fileId
    */
-  public static List<Pair<String, HoodieBaseFile>> 
getLatestBaseFilesForAllPartitions(final List<String> partitions,
-                                                                               
       final HoodieEngineContext context,
-                                                                               
       final HoodieTable hoodieTable) {
+  public static HoodieData<Pair<String, HoodieBaseFile>> 
getLatestBaseFilesForAllPartitions(final HoodieData<String> partitions,

Review Comment:
   Previously, we were collecting the distinct partitions list on the driver 
and then creating a new dataset with these partitions that we then fetch the 
base files for. Now I've left this to all execute on the executors and only 
collect when we need an aggregated result for the range pruning.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java:
##########
@@ -119,11 +119,11 @@ private HoodiePairData<HoodieKey, HoodieRecordLocation> 
lookupIndex(
       HoodiePairData<String, String> partitionRecordKeyPairs, final 
HoodieEngineContext context,
       final HoodieTable hoodieTable) {
     // Step 1: Obtain records per partition, in the incoming records
-    Map<String, Long> recordsPerPartition = 
partitionRecordKeyPairs.countByKey();
-    List<String> affectedPartitionPathList = new 
ArrayList<>(recordsPerPartition.keySet());
+    HoodiePairData<String, Long> recordsPerPartition = 
partitionRecordKeyPairs.mapToPair(pair -> Pair.of(pair.getLeft(), 
1L)).reduceByKey(Long::sum, 1);

Review Comment:
   The records per partition is used in 1 of 3 possible flows through the bloom 
filter and that flow is not enabled by default. My goal here was to make this 
lazily evaluated and avoid an unnecessary count call in the default case.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -177,27 +172,27 @@ public static <R> HoodieRecord<R> 
tagRecord(HoodieRecord<R> record, HoodieRecord
    * @param candidateRecordKeys - Candidate keys to filter
    * @return List of pairs of candidate keys and positions that are available 
in the file
    */
-  public static List<Pair<String, Long>> filterKeysFromFile(Path filePath, 
List<String> candidateRecordKeys,
+  public static List<Pair<String, Long>> filterKeysFromFile(Path filePath, 
Set<String> candidateRecordKeys,
                                                             Configuration 
configuration) throws HoodieIndexException {
+    if (candidateRecordKeys.isEmpty()) {

Review Comment:
   Early exit if there are no candidate records. Previously we were creating a 
file reader before exiting.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to