codope commented on code in PR #10352:
URL: https://github.com/apache/hudi/pull/10352#discussion_r1584882878
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -1872,4 +1883,175 @@ public HoodieRecord next() {
}
};
}
+
+ public static HoodieData<HoodieRecord>
convertFilesToPartitionStatsRecords(HoodieEngineContext engineContext,
+
List<DirectoryInfo> partitionInfoList,
+
HoodieMetadataConfig metadataConfig,
+
HoodieTableMetaClient dataTableMetaClient) {
+ final List<String> columnsToIndex =
metadataConfig.getColumnsEnabledForColumnStatsIndex();
+ if (columnsToIndex.isEmpty()) {
+ return engineContext.emptyHoodieData();
+ }
+ LOG.debug("Indexing following columns for partition stats index: {}",
columnsToIndex);
+ // Create records for MDT
+ int parallelism = Math.max(Math.min(partitionInfoList.size(),
metadataConfig.getPartitionStatsIndexParallelism()), 1);
+ return engineContext.parallelize(partitionInfoList,
parallelism).flatMap(partitionInfo -> {
+ final String partitionPath = partitionInfo.getRelativePath();
+ // Step 1: Collect Column Metadata for Each File (Your existing code
does this)
+ List<List<HoodieColumnRangeMetadata<Comparable>>> fileColumnMetadata =
partitionInfo.getFileNameToSizeMap().keySet().stream()
+ .map(fileName -> getFileStatsRangeMetadata(partitionPath,
partitionPath + "/" + fileName, dataTableMetaClient, columnsToIndex, false))
+ .collect(toList());
+ // Step 2: Flatten and Group by Column Name
+ Map<String, List<HoodieColumnRangeMetadata<Comparable>>>
columnMetadataMap = fileColumnMetadata.stream()
+ .flatMap(List::stream) // Flatten the list
+
.collect(Collectors.groupingBy(HoodieColumnRangeMetadata::getColumnName,
toList())); // Group by column name
+ // Step 3: Aggregate Column Ranges
+ Stream<HoodieColumnRangeMetadata<Comparable>>
partitionStatsRangeMetadata = columnMetadataMap.entrySet().stream()
+ .map(entry ->
BaseFileUtils.getColumnRangeInPartition(entry.getValue()));
+ return HoodieMetadataPayload.createPartitionStatsRecords(partitionPath,
partitionStatsRangeMetadata.collect(toList()), false).iterator();
+ });
+ }
+
+ private static List<HoodieColumnRangeMetadata<Comparable>>
getFileStatsRangeMetadata(String partitionPath,
+
String filePath,
+
HoodieTableMetaClient datasetMetaClient,
+
List<String> columnsToIndex,
+
boolean isDeleted) {
+ String filePartitionPath = filePath.startsWith("/") ?
filePath.substring(1) : filePath;
+ String fileName = FSUtils.getFileName(filePath, partitionPath);
+ if (isDeleted) {
+ return columnsToIndex.stream()
+ .map(entry -> HoodieColumnRangeMetadata.stub(fileName, entry))
+ .collect(Collectors.toList());
+ }
+ return readColumnRangeMetadataFrom(filePartitionPath, datasetMetaClient,
columnsToIndex);
+ }
+
+ public static HoodieData<HoodieRecord>
convertMetadataToPartitionStatsRecords(HoodieCommitMetadata commitMetadata,
Review Comment:
High-level steps are same but the two methods work with different objects.
This method works with writeStats while `convertFilesToPartitionStatsRecords`
works with actual file itself. Also, this method has some preprocessing which
is not required in the other method. I think the code reads better inline.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]