yihua commented on code in PR #14048:
URL: https://github.com/apache/hudi/pull/14048#discussion_r2402855471
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java:
##########
@@ -406,22 +408,25 @@ public static HoodiePairData<String,
List<HoodieColumnRangeMetadata<Comparable>>
// Step 2: Compute expression index records for the modified partitions
LOG.debug("Indexing following columns for partition stats index: {}",
validColumnsToIndex);
- List<String> partitionPaths = new
ArrayList<>(commitMetadata.getWritePartitionPaths());
- HoodieTableFileSystemView fileSystemView =
getFileSystemViewForMetadataTable(dataMetaClient);
- int parallelism = Math.max(Math.min(partitionPaths.size(),
metadataConfig.getPartitionStatsIndexParallelism()), 1);
- return engineContext.parallelize(partitionPaths,
parallelism).mapToPair(partitionName -> {
+ List<List<HoodieWriteStat>> partitionedWriteStats = new
ArrayList<>(commitMetadata.getWriteStats().stream()
+ .collect(Collectors.groupingBy(HoodieWriteStat::getPartitionPath))
+ .values());
+
+ Map<String, Set<String>> fileGroupIdsToReplaceMap = (commitMetadata
instanceof HoodieReplaceCommitMetadata)
+ ? ((HoodieReplaceCommitMetadata)
commitMetadata).getPartitionToReplaceFileIds()
+ .entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e
-> new HashSet<>(e.getValue())))
+ : Collections.emptyMap();
+
+ String maxInstantTime =
HoodieMetadataWriteUtils.getMaxInstantTime(dataMetaClient, instantTime);
+ int parallelism = Math.max(Math.min(partitionedWriteStats.size(),
metadataConfig.getPartitionStatsIndexParallelism()), 1);
+ return engineContext.parallelize(partitionedWriteStats,
parallelism).mapToPair(partitionedWriteStat -> {
+ final String partitionName =
partitionedWriteStat.get(0).getPartitionPath();
checkState(tableMetadata != null, "tableMetadata should not be null
when scanning metadata table");
// Collect Column Metadata for Each File part of active file system
view of latest snapshot
// Get all file names, including log files, in a set from the file
slices
- // TODO(yihua): fix this
- Set<String> fileNames =
HoodieTableMetadataUtil.getPartitionLatestFileSlicesIncludingInflight(dataMetaClient,
Option.of(fileSystemView), partitionName).stream()
- .flatMap(fileSlice -> Stream.concat(
-
Stream.of(fileSlice.getBaseFile().map(HoodieBaseFile::getFileName).orElse(null)),
- fileSlice.getLogFiles().map(HoodieLogFile::getFileName)))
- .filter(Objects::nonNull)
- .collect(Collectors.toSet());
+ Set<String> fileNames =
HoodieMetadataWriteUtils.getFilesToFetchColumnStats(partitionedWriteStat,
dataMetaClient, tableMetadata, dataWriteConfig, partitionName, maxInstantTime,
instantTime,
Review Comment:
For expression index, we have to compute the stats for all files based on
the expressions including the files in the `partitionedWriteStat`, correct?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java:
##########
@@ -471,44 +471,18 @@ public static HoodieData<HoodieRecord>
convertMetadataToPartitionStatRecords(Hoo
Option<String> lastCompletedInstant =
dataMetaClient.getActiveTimeline().filterCompletedInstants()
Review Comment:
This can be removed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]