codope commented on code in PR #12050:
URL: https://github.com/apache/hudi/pull/12050#discussion_r1798721593


##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -2127,29 +2159,76 @@ public static HoodieData<HoodieRecord> 
convertMetadataToPartitionStatsRecords(Ho
           .collect(Collectors.toList());
 
       int parallelism = Math.max(Math.min(partitionedWriteStats.size(), 
metadataConfig.getPartitionStatsIndexParallelism()), 1);
+      boolean shouldScanColStatsForTightBound = 
MetadataPartitionType.COLUMN_STATS.isMetadataPartitionAvailable(dataMetaClient)
+          && metadataConfig.isPartitionStatsIndexTightBoundEnabled() && 
WriteOperationType.isPartitionStatsTightBoundRequired(commitMetadata.getOperationType());
+      HoodieTableMetadata tableMetadata;
+      if (shouldScanColStatsForTightBound) {
+        tableMetadata = HoodieTableMetadata.create(engineContext, 
dataMetaClient.getStorage(), metadataConfig, 
dataMetaClient.getBasePath().toString());
+      } else {
+        tableMetadata = null;
+      }
       return engineContext.parallelize(partitionedWriteStats, 
parallelism).flatMap(partitionedWriteStat -> {
         final String partitionName = 
partitionedWriteStat.get(0).getPartitionPath();
-        // Step 1: Collect Column Metadata for Each File
+        // Step 1: Collect Column Metadata for Each File part of current 
commit metadata
         List<List<HoodieColumnRangeMetadata<Comparable>>> fileColumnMetadata = 
partitionedWriteStat.stream()
-                .map(writeStat -> translateWriteStatToFileStats(writeStat, 
dataMetaClient, columnsToIndex))
-                .collect(Collectors.toList());
+            .map(writeStat -> translateWriteStatToFileStats(writeStat, 
dataMetaClient, columnsToIndex, tableSchema))
+            .collect(Collectors.toList());
+        if (shouldScanColStatsForTightBound) {
+          checkState(tableMetadata != null, "tableMetadata should not be null 
when scanning metadata table");
+          // Collect Column Metadata for Each File part of active file system 
view of latest snapshot
+          // Get all file names, including log files, in a set from the file 
slices
+          Set<String> fileNames = getPartitionLatestFileSlices(dataMetaClient, 
Option.empty(), partitionName).stream()
+              .flatMap(fileSlice -> Stream.concat(
+                  
Stream.of(fileSlice.getBaseFile().map(HoodieBaseFile::getFileName).orElse(null)),
+                  fileSlice.getLogFiles().map(HoodieLogFile::getFileName)))
+              .filter(Objects::nonNull)
+              .collect(Collectors.toSet());
+          // Fetch metadata table COLUMN_STATS partition records for above 
files
+          List<HoodieColumnRangeMetadata<Comparable>> partitionColumnMetadata =
+              
tableMetadata.getRecordsByKeyPrefixes(generateKeyPrefixes(columnsToIndex, 
partitionName), MetadataPartitionType.COLUMN_STATS.getPartitionPath(), false)

Review Comment:
   Yes, that's the intention. But I assumed that when a file group is picked up 
for compaction, then all file slices of that file group will be compacted. 
However, we can reason about uncompacted file slices by looking at the log file 
deltacommit timestamp.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to