yihua commented on code in PR #6432:
URL: https://github.com/apache/hudi/pull/6432#discussion_r951958281


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/BloomIndexFileInfo.java:
##########
@@ -27,19 +27,20 @@
 public class BloomIndexFileInfo implements Serializable {
 
   private final String fileId;
-
+  private final String filename;

Review Comment:
   Here we store the filename to avoid recomputation of the same piece of 
information across stages.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java:
##########
@@ -121,12 +121,14 @@ private HoodiePairData<HoodieKey, HoodieRecordLocation> 
lookupIndex(
 
     // Step 2: Load all involved files as <Partition, filename> pairs
     List<Pair<String, BloomIndexFileInfo>> fileInfoList = 
getBloomIndexFileInfoForPartitions(context, hoodieTable, 
affectedPartitionPathList);
-    final Map<String, List<BloomIndexFileInfo>> partitionToFileInfo =
-        fileInfoList.stream().collect(groupingBy(Pair::getLeft, 
mapping(Pair::getRight, toList())));
+    // partition -> {file ID -> BloomIndexFileInfo instance}
+    final Map<String, Map<String, BloomIndexFileInfo>> partitionToFileInfo = 
fileInfoList.stream()

Review Comment:
   Switching from List to Map for each value so that the filename can be 
retrieved with file ID in O(1) time.



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java:
##########
@@ -82,25 +87,33 @@ public HoodiePairData<HoodieKey, HoodieRecordLocation> 
findMatchingFilesForRecor
     if (config.getBloomIndexUseMetadata()
         && hoodieTable.getMetaClient().getTableConfig().getMetadataPartitions()
         .contains(BLOOM_FILTERS.getPartitionPath())) {
-      // Step 1: Sort by file id
-      JavaRDD<Tuple2<String, HoodieKey>> sortedFileIdAndKeyPairs =
-          fileComparisonsRDD.sortBy(Tuple2::_1, true, joinParallelism);
+      // Step 1: Sort by bloom filter key (Hash ID of partition and file name) 
in metadata table
+      JavaRDD<Tuple2<Tuple2<String, String>, HoodieKey>> 
sortedFileIdAndKeyPairs =
+          fileComparisonsRDD
+              .sortBy(t -> HoodieMetadataPayload.getBloomFilterIndexKey(
+                  new PartitionIndexID(t._1._1), new FileIndexID(t._1._2)), 
true, joinParallelism);
 
       // Step 2: Use bloom filter to filter and the actual log file to get the 
record location
-      keyLookupResultRDD = sortedFileIdAndKeyPairs.mapPartitionsWithIndex(
-          new HoodieMetadataBloomIndexCheckFunction(hoodieTable), true);
+      keyLookupResultRDD = 
sortedFileIdAndKeyPairs.coalesce(config.getBloomIndexMetadataReadParallelism())

Review Comment:
   This limits the parallelism of bloom filter fetching.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java:
##########
@@ -121,12 +121,14 @@ private HoodiePairData<HoodieKey, HoodieRecordLocation> 
lookupIndex(
 
     // Step 2: Load all involved files as <Partition, filename> pairs
     List<Pair<String, BloomIndexFileInfo>> fileInfoList = 
getBloomIndexFileInfoForPartitions(context, hoodieTable, 
affectedPartitionPathList);
-    final Map<String, List<BloomIndexFileInfo>> partitionToFileInfo =
-        fileInfoList.stream().collect(groupingBy(Pair::getLeft, 
mapping(Pair::getRight, toList())));
+    // partition -> {file ID -> BloomIndexFileInfo instance}
+    final Map<String, Map<String, BloomIndexFileInfo>> partitionToFileInfo = 
fileInfoList.stream()
+        .collect(groupingBy(Pair::getLeft, toMap(entry -> 
entry.getRight().getFileId(), Pair::getRight)));
 
     // Step 3: Obtain a HoodieData, for each incoming record, that already 
exists, with the file id,
     // that contains it.
-    HoodieData<Pair<String, HoodieKey>> fileComparisonPairs =
+    // Each entry: ((File ID, Filename), HoodieKey instance)
+    HoodieData<Pair<Pair<String, String>, HoodieKey>> fileComparisonPairs =

Review Comment:
   Along the DAG, passing down the file names so they don't need to be 
recomputed.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java:
##########
@@ -202,41 +219,57 @@ private List<Pair<String, BloomIndexFileInfo>> 
getFileInfoForLatestBaseFiles(
    * @param partitions  - List of partitions for which column stats need to be 
loaded
    * @param context     - Engine context
    * @param hoodieTable - Hoodie table
+   * @param parallelism - Parallelism for reading column stats from metadata 
table
    * @return List of partition and file column range info pairs
    */
   protected List<Pair<String, BloomIndexFileInfo>> 
loadColumnRangesFromMetaIndex(
-      List<String> partitions, final HoodieEngineContext context, final 
HoodieTable<?, ?, ?, ?> hoodieTable) {
+      List<String> partitions, final HoodieEngineContext context,
+      final HoodieTable<?, ?, ?, ?> hoodieTable, int parallelism) {
     // also obtain file ranges, if range pruning is enabled
     context.setJobStatus(this.getClass().getName(), "Load meta index key 
ranges for file slices: " + config.getTableName());
 
     final String keyField = 
hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp();
-    return context.flatMap(partitions, partitionName -> {
-      // Partition and file name pairs
-      List<Pair<String, String>> partitionFileNameList = 
HoodieIndexUtils.getLatestBaseFilesForPartition(partitionName,
-              hoodieTable).stream().map(baseFile -> Pair.of(partitionName, 
baseFile.getFileName()))
-          .sorted()
-          .collect(toList());
-      if (partitionFileNameList.isEmpty()) {
-        return Stream.empty();
-      }
-      try {
-        Map<Pair<String, String>, HoodieMetadataColumnStats> 
fileToColumnStatsMap =
-            
hoodieTable.getMetadataTable().getColumnStats(partitionFileNameList, keyField);
-        List<Pair<String, BloomIndexFileInfo>> result = new ArrayList<>();
-        for (Map.Entry<Pair<String, String>, HoodieMetadataColumnStats> entry 
: fileToColumnStatsMap.entrySet()) {
-          result.add(Pair.of(entry.getKey().getLeft(),
-              new BloomIndexFileInfo(
-                  FSUtils.getFileId(entry.getKey().getRight()),
-                  // NOTE: Here we assume that the type of the primary key 
field is string
-                  (String) 
unwrapStatisticValueWrapper(entry.getValue().getMinValue()),
-                  (String) 
unwrapStatisticValueWrapper(entry.getValue().getMaxValue())
-              )));
-        }
-        return result.stream();
-      } catch (MetadataNotFoundException me) {
-        throw new HoodieMetadataException("Unable to find column range 
metadata for partition:" + partitionName, me);
-      }
-    }, Math.max(partitions.size(), 1));
+    return context.parallelize(partitions, 
Math.max(Math.min(partitions.size(), parallelism), 1))

Review Comment:
   This limits the parallelism of column stats fetching from the metadata table.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to