codope commented on code in PR #12050:
URL: https://github.com/apache/hudi/pull/12050#discussion_r1798772767


##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -2127,29 +2159,76 @@ public static HoodieData<HoodieRecord> 
convertMetadataToPartitionStatsRecords(Ho
           .collect(Collectors.toList());
 
       int parallelism = Math.max(Math.min(partitionedWriteStats.size(), 
metadataConfig.getPartitionStatsIndexParallelism()), 1);
+      boolean shouldScanColStatsForTightBound = 
MetadataPartitionType.COLUMN_STATS.isMetadataPartitionAvailable(dataMetaClient)
+          && metadataConfig.isPartitionStatsIndexTightBoundEnabled() && 
WriteOperationType.isPartitionStatsTightBoundRequired(commitMetadata.getOperationType());
+      HoodieTableMetadata tableMetadata;
+      if (shouldScanColStatsForTightBound) {
+        tableMetadata = HoodieTableMetadata.create(engineContext, 
dataMetaClient.getStorage(), metadataConfig, 
dataMetaClient.getBasePath().toString());
+      } else {
+        tableMetadata = null;
+      }
       return engineContext.parallelize(partitionedWriteStats, 
parallelism).flatMap(partitionedWriteStat -> {
         final String partitionName = 
partitionedWriteStat.get(0).getPartitionPath();
-        // Step 1: Collect Column Metadata for Each File
+        // Step 1: Collect Column Metadata for Each File part of current 
commit metadata
         List<List<HoodieColumnRangeMetadata<Comparable>>> fileColumnMetadata = 
partitionedWriteStat.stream()
-                .map(writeStat -> translateWriteStatToFileStats(writeStat, 
dataMetaClient, columnsToIndex))
-                .collect(Collectors.toList());
+            .map(writeStat -> translateWriteStatToFileStats(writeStat, 
dataMetaClient, columnsToIndex, tableSchema))
+            .collect(Collectors.toList());
+        if (shouldScanColStatsForTightBound) {
+          checkState(tableMetadata != null, "tableMetadata should not be null 
when scanning metadata table");
+          // Collect Column Metadata for Each File part of active file system 
view of latest snapshot
+          // Get all file names, including log files, in a set from the file 
slices
+          Set<String> fileNames = getPartitionLatestFileSlices(dataMetaClient, 
Option.empty(), partitionName).stream()
+              .flatMap(fileSlice -> Stream.concat(
+                  
Stream.of(fileSlice.getBaseFile().map(HoodieBaseFile::getFileName).orElse(null)),
+                  fileSlice.getLogFiles().map(HoodieLogFile::getFileName)))
+              .filter(Objects::nonNull)
+              .collect(Collectors.toSet());
+          // Fetch metadata table COLUMN_STATS partition records for above 
files
+          List<HoodieColumnRangeMetadata<Comparable>> partitionColumnMetadata =
+              
tableMetadata.getRecordsByKeyPrefixes(generateKeyPrefixes(columnsToIndex, 
partitionName), MetadataPartitionType.COLUMN_STATS.getPartitionPath(), false)

Review Comment:
   Actually I misunderstood. I was only thinking about filegroups within a 
partition, some compacted and some uncompacted. But, you're talking about 
bringing all partitions to a tighter bound. We could do that but it's going to 
be expensive. I think we should punt it for future work. In the worst case, we 
will scan a few additional partitions than necessary. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to