vinothchandar commented on code in PR #10352: URL: https://github.com/apache/hudi/pull/10352#discussion_r1498494219
########## hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java: ########## @@ -1915,4 +1924,162 @@ private static Path filePath(String basePath, String partition, String filename) return new Path(basePath, partition + StoragePath.SEPARATOR + filename); } } + + public static HoodieData<HoodieRecord> convertFilesToPartitionStatsRecords(HoodieEngineContext engineContext, + List<DirectoryInfo> partitionInfoList, + MetadataRecordsGenerationParams recordsGenerationParams) { + // Find the columns to index + HoodieTableMetaClient dataTableMetaClient = recordsGenerationParams.getDataMetaClient(); + final List<String> columnsToIndex = getColumnsToIndex( + recordsGenerationParams, + Lazy.lazily(() -> tryResolveSchemaForTable(dataTableMetaClient))); + if (columnsToIndex.isEmpty()) { + return engineContext.emptyHoodieData(); + } + LOG.debug(String.format("Indexing %d columns for partition stats index", columnsToIndex.size())); + // Create records for MDT + int parallelism = Math.max(Math.min(partitionInfoList.size(), recordsGenerationParams.getPartitionStatsIndexParallelism()), 1); + return engineContext.parallelize(partitionInfoList, parallelism).flatMap(partitionFiles -> { + final String partitionName = partitionFiles.getRelativePath(); + Stream<HoodieColumnRangeMetadata<Comparable>> partitionStatsRangeMetadata = partitionFiles.getFileNameToSizeMap().keySet().stream() + .map(fileName -> getFileStatsRangeMetadata(partitionName, partitionName + "/" + fileName, dataTableMetaClient, columnsToIndex, false)) + .map(BaseFileUtils::getColumnRangeInPartition); Review Comment: IIUC are n't you passing all column ranges for a given file into this? Would n't that merge ranges across different columns within a given file? ########## hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java: ########## @@ -67,6 +70,50 @@ public static BaseFileUtils getInstance(HoodieFileFormat fileFormat) { throw new UnsupportedOperationException(fileFormat.name() + " format not supported yet."); } + /** + * Aggregate column range statistics across files in a partition. + * + * @param fileRanges List of column range statistics for each file in a partition + */ + public static <T extends Comparable<T>> HoodieColumnRangeMetadata<T> getColumnRangeInPartition(@Nonnull List<HoodieColumnRangeMetadata<T>> fileRanges) { + if (fileRanges.size() == 1) { + // Only one parquet file, we can just return that range. + return fileRanges.get(0); + } + // There are multiple files. Compute min(file_mins) and max(file_maxs) + return fileRanges.stream() + .sequential() + .reduce(BaseFileUtils::mergeRanges).get(); Review Comment: we should be merging across files on a given column ########## hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java: ########## @@ -67,6 +70,50 @@ public static BaseFileUtils getInstance(HoodieFileFormat fileFormat) { throw new UnsupportedOperationException(fileFormat.name() + " format not supported yet."); } + /** + * Aggregate column range statistics across files in a partition. + * + * @param fileRanges List of column range statistics for each file in a partition + */ + public static <T extends Comparable<T>> HoodieColumnRangeMetadata<T> getColumnRangeInPartition(@Nonnull List<HoodieColumnRangeMetadata<T>> fileRanges) { + if (fileRanges.size() == 1) { Review Comment: I think this is column range, and not multiple files? i.e list has an entry for each column for a single file. no? ########## hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java: ########## @@ -1915,4 +1924,162 @@ private static Path filePath(String basePath, String partition, String filename) return new Path(basePath, partition + StoragePath.SEPARATOR + filename); } } + + public static HoodieData<HoodieRecord> convertFilesToPartitionStatsRecords(HoodieEngineContext engineContext, + List<DirectoryInfo> partitionInfoList, + MetadataRecordsGenerationParams recordsGenerationParams) { + // Find the columns to index + HoodieTableMetaClient dataTableMetaClient = recordsGenerationParams.getDataMetaClient(); + final List<String> columnsToIndex = getColumnsToIndex( + recordsGenerationParams, + Lazy.lazily(() -> tryResolveSchemaForTable(dataTableMetaClient))); + if (columnsToIndex.isEmpty()) { + return engineContext.emptyHoodieData(); + } + LOG.debug(String.format("Indexing %d columns for partition stats index", columnsToIndex.size())); + // Create records for MDT + int parallelism = Math.max(Math.min(partitionInfoList.size(), recordsGenerationParams.getPartitionStatsIndexParallelism()), 1); + return engineContext.parallelize(partitionInfoList, parallelism).flatMap(partitionFiles -> { Review Comment: rename: partitionFiles -> partitionInfo ########## hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java: ########## @@ -1915,4 +1924,162 @@ private static Path filePath(String basePath, String partition, String filename) return new Path(basePath, partition + StoragePath.SEPARATOR + filename); } } + + public static HoodieData<HoodieRecord> convertFilesToPartitionStatsRecords(HoodieEngineContext engineContext, + List<DirectoryInfo> partitionInfoList, + MetadataRecordsGenerationParams recordsGenerationParams) { + // Find the columns to index + HoodieTableMetaClient dataTableMetaClient = recordsGenerationParams.getDataMetaClient(); + final List<String> columnsToIndex = getColumnsToIndex( + recordsGenerationParams, + Lazy.lazily(() -> tryResolveSchemaForTable(dataTableMetaClient))); + if (columnsToIndex.isEmpty()) { + return engineContext.emptyHoodieData(); + } + LOG.debug(String.format("Indexing %d columns for partition stats index", columnsToIndex.size())); + // Create records for MDT + int parallelism = Math.max(Math.min(partitionInfoList.size(), recordsGenerationParams.getPartitionStatsIndexParallelism()), 1); + return engineContext.parallelize(partitionInfoList, parallelism).flatMap(partitionFiles -> { + final String partitionName = partitionFiles.getRelativePath(); + Stream<HoodieColumnRangeMetadata<Comparable>> partitionStatsRangeMetadata = partitionFiles.getFileNameToSizeMap().keySet().stream() + .map(fileName -> getFileStatsRangeMetadata(partitionName, partitionName + "/" + fileName, dataTableMetaClient, columnsToIndex, false)) Review Comment: are we reading all the column stats again to generate partition stats? I think we should avoid doing this extra work and piggy back off the column stats index values? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org