nsivabalan commented on code in PR #14048:
URL: https://github.com/apache/hudi/pull/14048#discussion_r2402899933


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java:
##########
@@ -406,22 +408,25 @@ public static HoodiePairData<String, 
List<HoodieColumnRangeMetadata<Comparable>>
 
       // Step 2: Compute expression index records for the modified partitions
       LOG.debug("Indexing following columns for partition stats index: {}", 
validColumnsToIndex);
-      List<String> partitionPaths = new 
ArrayList<>(commitMetadata.getWritePartitionPaths());
-      HoodieTableFileSystemView fileSystemView = 
getFileSystemViewForMetadataTable(dataMetaClient);
-      int parallelism = Math.max(Math.min(partitionPaths.size(), 
metadataConfig.getPartitionStatsIndexParallelism()), 1);
-      return engineContext.parallelize(partitionPaths, 
parallelism).mapToPair(partitionName -> {
+      List<List<HoodieWriteStat>> partitionedWriteStats = new 
ArrayList<>(commitMetadata.getWriteStats().stream()
+          .collect(Collectors.groupingBy(HoodieWriteStat::getPartitionPath))
+          .values());
+
+      Map<String, Set<String>> fileGroupIdsToReplaceMap = (commitMetadata 
instanceof HoodieReplaceCommitMetadata)
+          ? ((HoodieReplaceCommitMetadata) 
commitMetadata).getPartitionToReplaceFileIds()
+          .entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e 
-> new HashSet<>(e.getValue())))
+          : Collections.emptyMap();
+
+      String maxInstantTime = 
HoodieMetadataWriteUtils.getMaxInstantTime(dataMetaClient, instantTime);
+      int parallelism = Math.max(Math.min(partitionedWriteStats.size(), 
metadataConfig.getPartitionStatsIndexParallelism()), 1);
+      return engineContext.parallelize(partitionedWriteStats, 
parallelism).mapToPair(partitionedWriteStat -> {
+        final String partitionName = 
partitionedWriteStat.get(0).getPartitionPath();
         checkState(tableMetadata != null, "tableMetadata should not be null 
when scanning metadata table");
         // Collect Column Metadata for Each File part of active file system 
view of latest snapshot
         // Get all file names, including log files, in a set from the file 
slices
-        // TODO(yihua): fix this
-        Set<String> fileNames = 
HoodieTableMetadataUtil.getPartitionLatestFileSlicesIncludingInflight(dataMetaClient,
 Option.of(fileSystemView), partitionName).stream()
-            .flatMap(fileSlice -> Stream.concat(
-                
Stream.of(fileSlice.getBaseFile().map(HoodieBaseFile::getFileName).orElse(null)),
-                fileSlice.getLogFiles().map(HoodieLogFile::getFileName)))
-            .filter(Objects::nonNull)
-            .collect(Collectors.toSet());
+        Set<String> fileNames = 
HoodieMetadataWriteUtils.getFilesToFetchColumnStats(partitionedWriteStat, 
dataMetaClient, tableMetadata, dataWriteConfig, partitionName, maxInstantTime, 
instantTime,

Review Comment:
   expr index is slightly different. 
   can you go back to the caller of this. 
   bcoz, we can't compute the expr index from within spark task, expr index 
compuation is slightly different from partition stats index , where we compute 
the col stats for individual files within the same spark task. 
   



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java:
##########
@@ -406,22 +408,25 @@ public static HoodiePairData<String, 
List<HoodieColumnRangeMetadata<Comparable>>
 
       // Step 2: Compute expression index records for the modified partitions
       LOG.debug("Indexing following columns for partition stats index: {}", 
validColumnsToIndex);
-      List<String> partitionPaths = new 
ArrayList<>(commitMetadata.getWritePartitionPaths());
-      HoodieTableFileSystemView fileSystemView = 
getFileSystemViewForMetadataTable(dataMetaClient);
-      int parallelism = Math.max(Math.min(partitionPaths.size(), 
metadataConfig.getPartitionStatsIndexParallelism()), 1);
-      return engineContext.parallelize(partitionPaths, 
parallelism).mapToPair(partitionName -> {
+      List<List<HoodieWriteStat>> partitionedWriteStats = new 
ArrayList<>(commitMetadata.getWriteStats().stream()
+          .collect(Collectors.groupingBy(HoodieWriteStat::getPartitionPath))
+          .values());
+
+      Map<String, Set<String>> fileGroupIdsToReplaceMap = (commitMetadata 
instanceof HoodieReplaceCommitMetadata)
+          ? ((HoodieReplaceCommitMetadata) 
commitMetadata).getPartitionToReplaceFileIds()
+          .entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e 
-> new HashSet<>(e.getValue())))
+          : Collections.emptyMap();
+
+      String maxInstantTime = 
HoodieMetadataWriteUtils.getMaxInstantTime(dataMetaClient, instantTime);
+      int parallelism = Math.max(Math.min(partitionedWriteStats.size(), 
metadataConfig.getPartitionStatsIndexParallelism()), 1);
+      return engineContext.parallelize(partitionedWriteStats, 
parallelism).mapToPair(partitionedWriteStat -> {
+        final String partitionName = 
partitionedWriteStat.get(0).getPartitionPath();
         checkState(tableMetadata != null, "tableMetadata should not be null 
when scanning metadata table");
         // Collect Column Metadata for Each File part of active file system 
view of latest snapshot
         // Get all file names, including log files, in a set from the file 
slices
-        // TODO(yihua): fix this
-        Set<String> fileNames = 
HoodieTableMetadataUtil.getPartitionLatestFileSlicesIncludingInflight(dataMetaClient,
 Option.of(fileSystemView), partitionName).stream()
-            .flatMap(fileSlice -> Stream.concat(
-                
Stream.of(fileSlice.getBaseFile().map(HoodieBaseFile::getFileName).orElse(null)),
-                fileSlice.getLogFiles().map(HoodieLogFile::getFileName)))
-            .filter(Objects::nonNull)
-            .collect(Collectors.toSet());
+        Set<String> fileNames = 
HoodieMetadataWriteUtils.getFilesToFetchColumnStats(partitionedWriteStat, 
dataMetaClient, tableMetadata, dataWriteConfig, partitionName, maxInstantTime, 
instantTime,

Review Comment:
   expr index is slightly different. 
   can you go back to the caller of this. 
   bcoz, we can't compute the expr index from within spark task, expr index 
compuation is slightly different from partition stats index , where we compute 
the col stats for individual files within a map call. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to