manojpec commented on a change in pull request #4352:
URL: https://github.com/apache/hudi/pull/4352#discussion_r784650861



##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
##########
@@ -159,6 +173,71 @@ public HoodieBloomIndex(HoodieWriteConfig config, 
BaseHoodieBloomIndexHelper blo
     }
   }
 
+  /**
+   * Load the column stats index as BloomIndexFileInfo for all the involved 
files in the partition.
+   *
+   * @param partitions  - List of partitions for which column stats need to be 
loaded
+   * @param context     - Engine context
+   * @param hoodieTable - Hoodie table
+   */
+  List<Pair<String, BloomIndexFileInfo>> loadColumnStats(
+      List<String> partitions, final HoodieEngineContext context, final 
HoodieTable hoodieTable) {
+    HoodieTimer timer = new HoodieTimer().startTimer();
+    if (config.getBloomIndexPruneByRanges()) {
+      // also obtain file ranges, if range pruning is enabled
+      context.setJobStatus(this.getClass().getName(), "Obtain key ranges for 
file slices (range pruning=on)");
+
+      final String keyField = 
hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp();
+      return context.flatMap(partitions, new SerializableFunction<String, 
Stream<Pair<String, BloomIndexFileInfo>>>() {
+        @Override
+        public Stream<Pair<String, BloomIndexFileInfo>> apply(String 
partitionName) throws Exception {
+          final String columnIndexID = new 
ColumnIndexID(keyField).asBase64EncodedString();
+          final String partitionIndexID = new 
PartitionIndexID(partitionName).asBase64EncodedString();
+
+          List<Pair<String, String>> partitionFileIdList =
+              HoodieIndexUtils.getLatestBaseFilesForPartition(partitionName,
+                      hoodieTable).stream().map(baseFile -> 
Pair.of(baseFile.getFileId(), baseFile.getFileName()))
+                  .collect(toList());
+          try {
+            Map<String, String> columnStatKeyToFileIdMap = new HashMap<>();
+            List<String> columnStatKeys = new ArrayList<>();
+            for (Pair<String, String> fileIdFileName : partitionFileIdList) {
+              final String columnStatIndexKey = columnIndexID
+                  .concat(partitionIndexID)
+                  .concat(new 
FileIndexID(fileIdFileName.getLeft()).asBase64EncodedString());
+              columnStatKeys.add(columnStatIndexKey);
+              columnStatKeyToFileIdMap.put(columnStatIndexKey, 
fileIdFileName.getLeft());
+            }
+            Collections.sort(columnStatKeys);
+
+            Map<String, HoodieColumnStats> columnKeyHashToStatMap = hoodieTable
+                .getMetadataTable().getColumnStats(columnStatKeys);
+            List<Pair<String, BloomIndexFileInfo>> result = new ArrayList<>();
+            for (Map.Entry<String, HoodieColumnStats> entry : 
columnKeyHashToStatMap.entrySet()) {
+              result.add(Pair.of(partitionName,
+                  new BloomIndexFileInfo(
+                      columnStatKeyToFileIdMap.get(entry.getKey()),
+                      entry.getValue().getMinValue(),
+                      entry.getValue().getMaxValue()
+                  )));
+            }
+            return result.stream();
+          } catch (MetadataNotFoundException me) {
+            throw new HoodieMetadataException("Unable to find column range 
metadata for partition:" + partitionName, me);
+          }
+        }
+      }, Math.max(partitions.size(), 1));
+    } else {
+      // Obtain the latest data files from all the partitions.
+      List<Pair<String, String>> partitionPathFileIDList = 
getLatestBaseFilesForAllPartitions(partitions, context,

Review comment:
       fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to