manojpec commented on a change in pull request #4352:
URL: https://github.com/apache/hudi/pull/4352#discussion_r784650861
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
##########
@@ -159,6 +173,71 @@ public HoodieBloomIndex(HoodieWriteConfig config,
BaseHoodieBloomIndexHelper blo
}
}
+ /**
+ * Load the column stats index as BloomIndexFileInfo for all the involved
files in the partition.
+ *
+ * @param partitions - List of partitions for which column stats need to be
loaded
+ * @param context - Engine context
+ * @param hoodieTable - Hoodie table
+ */
+ List<Pair<String, BloomIndexFileInfo>> loadColumnStats(
+ List<String> partitions, final HoodieEngineContext context, final
HoodieTable hoodieTable) {
+ HoodieTimer timer = new HoodieTimer().startTimer();
+ if (config.getBloomIndexPruneByRanges()) {
+ // also obtain file ranges, if range pruning is enabled
+ context.setJobStatus(this.getClass().getName(), "Obtain key ranges for
file slices (range pruning=on)");
+
+ final String keyField =
hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp();
+ return context.flatMap(partitions, new SerializableFunction<String,
Stream<Pair<String, BloomIndexFileInfo>>>() {
+ @Override
+ public Stream<Pair<String, BloomIndexFileInfo>> apply(String
partitionName) throws Exception {
+ final String columnIndexID = new
ColumnIndexID(keyField).asBase64EncodedString();
+ final String partitionIndexID = new
PartitionIndexID(partitionName).asBase64EncodedString();
+
+ List<Pair<String, String>> partitionFileIdList =
+ HoodieIndexUtils.getLatestBaseFilesForPartition(partitionName,
+ hoodieTable).stream().map(baseFile ->
Pair.of(baseFile.getFileId(), baseFile.getFileName()))
+ .collect(toList());
+ try {
+ Map<String, String> columnStatKeyToFileIdMap = new HashMap<>();
+ List<String> columnStatKeys = new ArrayList<>();
+ for (Pair<String, String> fileIdFileName : partitionFileIdList) {
+ final String columnStatIndexKey = columnIndexID
+ .concat(partitionIndexID)
+ .concat(new
FileIndexID(fileIdFileName.getLeft()).asBase64EncodedString());
+ columnStatKeys.add(columnStatIndexKey);
+ columnStatKeyToFileIdMap.put(columnStatIndexKey,
fileIdFileName.getLeft());
+ }
+ Collections.sort(columnStatKeys);
+
+ Map<String, HoodieColumnStats> columnKeyHashToStatMap = hoodieTable
+ .getMetadataTable().getColumnStats(columnStatKeys);
+ List<Pair<String, BloomIndexFileInfo>> result = new ArrayList<>();
+ for (Map.Entry<String, HoodieColumnStats> entry :
columnKeyHashToStatMap.entrySet()) {
+ result.add(Pair.of(partitionName,
+ new BloomIndexFileInfo(
+ columnStatKeyToFileIdMap.get(entry.getKey()),
+ entry.getValue().getMinValue(),
+ entry.getValue().getMaxValue()
+ )));
+ }
+ return result.stream();
+ } catch (MetadataNotFoundException me) {
+ throw new HoodieMetadataException("Unable to find column range
metadata for partition:" + partitionName, me);
+ }
+ }
+ }, Math.max(partitions.size(), 1));
+ } else {
+ // Obtain the latest data files from all the partitions.
+ List<Pair<String, String>> partitionPathFileIDList =
getLatestBaseFilesForAllPartitions(partitions, context,
Review comment:
fixed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]