codope commented on code in PR #12050:
URL: https://github.com/apache/hudi/pull/12050#discussion_r1798721593
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -2127,29 +2159,76 @@ public static HoodieData<HoodieRecord>
convertMetadataToPartitionStatsRecords(Ho
.collect(Collectors.toList());
int parallelism = Math.max(Math.min(partitionedWriteStats.size(),
metadataConfig.getPartitionStatsIndexParallelism()), 1);
+ boolean shouldScanColStatsForTightBound =
MetadataPartitionType.COLUMN_STATS.isMetadataPartitionAvailable(dataMetaClient)
+ && metadataConfig.isPartitionStatsIndexTightBoundEnabled() &&
WriteOperationType.isPartitionStatsTightBoundRequired(commitMetadata.getOperationType());
+ HoodieTableMetadata tableMetadata;
+ if (shouldScanColStatsForTightBound) {
+ tableMetadata = HoodieTableMetadata.create(engineContext,
dataMetaClient.getStorage(), metadataConfig,
dataMetaClient.getBasePath().toString());
+ } else {
+ tableMetadata = null;
+ }
return engineContext.parallelize(partitionedWriteStats,
parallelism).flatMap(partitionedWriteStat -> {
final String partitionName =
partitionedWriteStat.get(0).getPartitionPath();
- // Step 1: Collect Column Metadata for Each File
+ // Step 1: Collect Column Metadata for Each File part of current
commit metadata
List<List<HoodieColumnRangeMetadata<Comparable>>> fileColumnMetadata =
partitionedWriteStat.stream()
- .map(writeStat -> translateWriteStatToFileStats(writeStat,
dataMetaClient, columnsToIndex))
- .collect(Collectors.toList());
+ .map(writeStat -> translateWriteStatToFileStats(writeStat,
dataMetaClient, columnsToIndex, tableSchema))
+ .collect(Collectors.toList());
+ if (shouldScanColStatsForTightBound) {
+ checkState(tableMetadata != null, "tableMetadata should not be null
when scanning metadata table");
+ // Collect Column Metadata for Each File part of active file system
view of latest snapshot
+ // Get all file names, including log files, in a set from the file
slices
+ Set<String> fileNames = getPartitionLatestFileSlices(dataMetaClient,
Option.empty(), partitionName).stream()
+ .flatMap(fileSlice -> Stream.concat(
+
Stream.of(fileSlice.getBaseFile().map(HoodieBaseFile::getFileName).orElse(null)),
+ fileSlice.getLogFiles().map(HoodieLogFile::getFileName)))
+ .filter(Objects::nonNull)
+ .collect(Collectors.toSet());
+ // Fetch metadata table COLUMN_STATS partition records for above
files
+ List<HoodieColumnRangeMetadata<Comparable>> partitionColumnMetadata =
+
tableMetadata.getRecordsByKeyPrefixes(generateKeyPrefixes(columnsToIndex,
partitionName), MetadataPartitionType.COLUMN_STATS.getPartitionPath(), false)
Review Comment:
Yes, that's the intention. But I assumed that when a file group is picked up
for compaction, then all file slices of that file group will be compacted.
However, we can reason about uncompacted file slices by looking at the log file
deltacommit timestamp.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]