codope commented on code in PR #12050:
URL: https://github.com/apache/hudi/pull/12050#discussion_r1798772767
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -2127,29 +2159,76 @@ public static HoodieData<HoodieRecord>
convertMetadataToPartitionStatsRecords(Ho
.collect(Collectors.toList());
int parallelism = Math.max(Math.min(partitionedWriteStats.size(),
metadataConfig.getPartitionStatsIndexParallelism()), 1);
+ boolean shouldScanColStatsForTightBound =
MetadataPartitionType.COLUMN_STATS.isMetadataPartitionAvailable(dataMetaClient)
+ && metadataConfig.isPartitionStatsIndexTightBoundEnabled() &&
WriteOperationType.isPartitionStatsTightBoundRequired(commitMetadata.getOperationType());
+ HoodieTableMetadata tableMetadata;
+ if (shouldScanColStatsForTightBound) {
+ tableMetadata = HoodieTableMetadata.create(engineContext,
dataMetaClient.getStorage(), metadataConfig,
dataMetaClient.getBasePath().toString());
+ } else {
+ tableMetadata = null;
+ }
return engineContext.parallelize(partitionedWriteStats,
parallelism).flatMap(partitionedWriteStat -> {
final String partitionName =
partitionedWriteStat.get(0).getPartitionPath();
- // Step 1: Collect Column Metadata for Each File
+ // Step 1: Collect Column Metadata for Each File part of current
commit metadata
List<List<HoodieColumnRangeMetadata<Comparable>>> fileColumnMetadata =
partitionedWriteStat.stream()
- .map(writeStat -> translateWriteStatToFileStats(writeStat,
dataMetaClient, columnsToIndex))
- .collect(Collectors.toList());
+ .map(writeStat -> translateWriteStatToFileStats(writeStat,
dataMetaClient, columnsToIndex, tableSchema))
+ .collect(Collectors.toList());
+ if (shouldScanColStatsForTightBound) {
+ checkState(tableMetadata != null, "tableMetadata should not be null
when scanning metadata table");
+ // Collect Column Metadata for Each File part of active file system
view of latest snapshot
+ // Get all file names, including log files, in a set from the file
slices
+ Set<String> fileNames = getPartitionLatestFileSlices(dataMetaClient,
Option.empty(), partitionName).stream()
+ .flatMap(fileSlice -> Stream.concat(
+
Stream.of(fileSlice.getBaseFile().map(HoodieBaseFile::getFileName).orElse(null)),
+ fileSlice.getLogFiles().map(HoodieLogFile::getFileName)))
+ .filter(Objects::nonNull)
+ .collect(Collectors.toSet());
+ // Fetch metadata table COLUMN_STATS partition records for above
files
+ List<HoodieColumnRangeMetadata<Comparable>> partitionColumnMetadata =
+
tableMetadata.getRecordsByKeyPrefixes(generateKeyPrefixes(columnsToIndex,
partitionName), MetadataPartitionType.COLUMN_STATS.getPartitionPath(), false)
Review Comment:
Actually I misunderstood. I was only thinking about filegroups within a
partition, some compacted and some uncompacted. But, you're talking about
bringing all partitions to a tighter bound. We could do that but it's going to
be expensive. I think we should punt it for future work. In the worst case, we
will scan a few additional partitions than necessary.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]