yihua commented on code in PR #14048:
URL: https://github.com/apache/hudi/pull/14048#discussion_r2403029439


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java:
##########
@@ -406,22 +408,25 @@ public static HoodiePairData<String, 
List<HoodieColumnRangeMetadata<Comparable>>
 
       // Step 2: Compute expression index records for the modified partitions
       LOG.debug("Indexing following columns for partition stats index: {}", 
validColumnsToIndex);
-      List<String> partitionPaths = new 
ArrayList<>(commitMetadata.getWritePartitionPaths());
-      HoodieTableFileSystemView fileSystemView = 
getFileSystemViewForMetadataTable(dataMetaClient);
-      int parallelism = Math.max(Math.min(partitionPaths.size(), 
metadataConfig.getPartitionStatsIndexParallelism()), 1);
-      return engineContext.parallelize(partitionPaths, 
parallelism).mapToPair(partitionName -> {
+      List<List<HoodieWriteStat>> partitionedWriteStats = new 
ArrayList<>(commitMetadata.getWriteStats().stream()
+          .collect(Collectors.groupingBy(HoodieWriteStat::getPartitionPath))
+          .values());
+
+      Map<String, Set<String>> fileGroupIdsToReplaceMap = (commitMetadata 
instanceof HoodieReplaceCommitMetadata)
+          ? ((HoodieReplaceCommitMetadata) 
commitMetadata).getPartitionToReplaceFileIds()
+          .entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e 
-> new HashSet<>(e.getValue())))
+          : Collections.emptyMap();
+
+      String maxInstantTime = 
HoodieMetadataWriteUtils.getMaxInstantTime(dataMetaClient, instantTime);
+      int parallelism = Math.max(Math.min(partitionedWriteStats.size(), 
metadataConfig.getPartitionStatsIndexParallelism()), 1);
+      return engineContext.parallelize(partitionedWriteStats, 
parallelism).mapToPair(partitionedWriteStat -> {
+        final String partitionName = 
partitionedWriteStat.get(0).getPartitionPath();
         checkState(tableMetadata != null, "tableMetadata should not be null 
when scanning metadata table");
         // Collect Column Metadata for Each File part of active file system 
view of latest snapshot
         // Get all file names, including log files, in a set from the file 
slices
-        // TODO(yihua): fix this
-        Set<String> fileNames = 
HoodieTableMetadataUtil.getPartitionLatestFileSlicesIncludingInflight(dataMetaClient,
 Option.of(fileSystemView), partitionName).stream()
-            .flatMap(fileSlice -> Stream.concat(
-                
Stream.of(fileSlice.getBaseFile().map(HoodieBaseFile::getFileName).orElse(null)),
-                fileSlice.getLogFiles().map(HoodieLogFile::getFileName)))
-            .filter(Objects::nonNull)
-            .collect(Collectors.toSet());
+        Set<String> fileNames = 
HoodieMetadataWriteUtils.getFilesToFetchColumnStats(partitionedWriteStat, 
dataMetaClient, tableMetadata, dataWriteConfig, partitionName, maxInstantTime, 
instantTime,

Review Comment:
   Got it, that makes sense.  I renamed the method to avoid confusion and make 
it clear.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to