manojpec commented on a change in pull request #4352:
URL: https://github.com/apache/hudi/pull/4352#discussion_r784653402



##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
##########
@@ -159,6 +170,75 @@ public HoodieBloomIndex(HoodieWriteConfig config, 
BaseHoodieBloomIndexHelper blo
     }
   }
 
+  /**
+   * Load the column stats index as BloomIndexFileInfo for all the involved 
files in the partition.
+   *
+   * @param partitions  - List of partitions for which column stats need to be 
loaded
+   * @param context     - Engine context
+   * @param hoodieTable - Hoodie table
+   * @return List of partition and file column range info pairs
+   */
+  List<Pair<String, BloomIndexFileInfo>> loadColumnStats(
+      List<String> partitions, final HoodieEngineContext context, final 
HoodieTable hoodieTable) {
+    HoodieTimer timer = new HoodieTimer().startTimer();
+    if (config.getBloomIndexPruneByRanges()) {
+      // also obtain file ranges, if range pruning is enabled
+      context.setJobStatus(this.getClass().getName(), "Obtain key ranges for 
file slices (range pruning=on)");
+
+      final String keyField = 
hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp();
+      return context.flatMap(partitions, new SerializableFunction<String, 
Stream<Pair<String, BloomIndexFileInfo>>>() {
+        @Override
+        public Stream<Pair<String, BloomIndexFileInfo>> apply(String 
partitionName) throws Exception {
+          final String columnIndexID = new 
ColumnIndexID(keyField).asBase64EncodedString();
+          final String partitionIndexID = new 
PartitionIndexID(partitionName).asBase64EncodedString();
+
+          List<Pair<String, String>> partitionFileIdList =
+              HoodieIndexUtils.getLatestBaseFilesForPartition(partitionName,
+                      hoodieTable).stream().map(baseFile -> 
Pair.of(baseFile.getFileId(), baseFile.getFileName()))
+                  .collect(toList());
+          try {
+            Map<String, String> columnStatKeyToFileIdMap = new HashMap<>();
+            List<String> columnStatKeys = new ArrayList<>();
+            for (Pair<String, String> fileIdFileName : partitionFileIdList) {
+              final String columnStatIndexKey = columnIndexID
+                  .concat(partitionIndexID)
+                  .concat(new 
FileIndexID(fileIdFileName.getRight()).asBase64EncodedString());
+              columnStatKeys.add(columnStatIndexKey);
+              columnStatKeyToFileIdMap.put(columnStatIndexKey, 
fileIdFileName.getLeft());
+            }
+            if (columnStatKeys.isEmpty()) {
+              return Stream.empty();
+            }
+
+            Collections.sort(columnStatKeys);
+            Map<String, HoodieColumnStats> columnKeyHashToStatMap = hoodieTable
+                .getMetadataTable().getColumnStats(columnStatKeys);

Review comment:
       Fixed the signature. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to