nsivabalan commented on code in PR #14048:
URL: https://github.com/apache/hudi/pull/14048#discussion_r2402899933
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java:
##########
@@ -406,22 +408,25 @@ public static HoodiePairData<String,
List<HoodieColumnRangeMetadata<Comparable>>
// Step 2: Compute expression index records for the modified partitions
LOG.debug("Indexing following columns for partition stats index: {}",
validColumnsToIndex);
- List<String> partitionPaths = new
ArrayList<>(commitMetadata.getWritePartitionPaths());
- HoodieTableFileSystemView fileSystemView =
getFileSystemViewForMetadataTable(dataMetaClient);
- int parallelism = Math.max(Math.min(partitionPaths.size(),
metadataConfig.getPartitionStatsIndexParallelism()), 1);
- return engineContext.parallelize(partitionPaths,
parallelism).mapToPair(partitionName -> {
+ List<List<HoodieWriteStat>> partitionedWriteStats = new
ArrayList<>(commitMetadata.getWriteStats().stream()
+ .collect(Collectors.groupingBy(HoodieWriteStat::getPartitionPath))
+ .values());
+
+ Map<String, Set<String>> fileGroupIdsToReplaceMap = (commitMetadata
instanceof HoodieReplaceCommitMetadata)
+ ? ((HoodieReplaceCommitMetadata)
commitMetadata).getPartitionToReplaceFileIds()
+ .entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e
-> new HashSet<>(e.getValue())))
+ : Collections.emptyMap();
+
+ String maxInstantTime =
HoodieMetadataWriteUtils.getMaxInstantTime(dataMetaClient, instantTime);
+ int parallelism = Math.max(Math.min(partitionedWriteStats.size(),
metadataConfig.getPartitionStatsIndexParallelism()), 1);
+ return engineContext.parallelize(partitionedWriteStats,
parallelism).mapToPair(partitionedWriteStat -> {
+ final String partitionName =
partitionedWriteStat.get(0).getPartitionPath();
checkState(tableMetadata != null, "tableMetadata should not be null
when scanning metadata table");
// Collect Column Metadata for Each File part of active file system
view of latest snapshot
// Get all file names, including log files, in a set from the file
slices
- // TODO(yihua): fix this
- Set<String> fileNames =
HoodieTableMetadataUtil.getPartitionLatestFileSlicesIncludingInflight(dataMetaClient,
Option.of(fileSystemView), partitionName).stream()
- .flatMap(fileSlice -> Stream.concat(
-
Stream.of(fileSlice.getBaseFile().map(HoodieBaseFile::getFileName).orElse(null)),
- fileSlice.getLogFiles().map(HoodieLogFile::getFileName)))
- .filter(Objects::nonNull)
- .collect(Collectors.toSet());
+ Set<String> fileNames =
HoodieMetadataWriteUtils.getFilesToFetchColumnStats(partitionedWriteStat,
dataMetaClient, tableMetadata, dataWriteConfig, partitionName, maxInstantTime,
instantTime,
Review Comment:
expr index is slightly different.
can you go back to the caller of this.
bcoz, we can't compute the expr index from within spark task, expr index
compuation is slightly different from partition stats index , where we compute
the col stats for individual files within the same spark task.
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java:
##########
@@ -406,22 +408,25 @@ public static HoodiePairData<String,
List<HoodieColumnRangeMetadata<Comparable>>
// Step 2: Compute expression index records for the modified partitions
LOG.debug("Indexing following columns for partition stats index: {}",
validColumnsToIndex);
- List<String> partitionPaths = new
ArrayList<>(commitMetadata.getWritePartitionPaths());
- HoodieTableFileSystemView fileSystemView =
getFileSystemViewForMetadataTable(dataMetaClient);
- int parallelism = Math.max(Math.min(partitionPaths.size(),
metadataConfig.getPartitionStatsIndexParallelism()), 1);
- return engineContext.parallelize(partitionPaths,
parallelism).mapToPair(partitionName -> {
+ List<List<HoodieWriteStat>> partitionedWriteStats = new
ArrayList<>(commitMetadata.getWriteStats().stream()
+ .collect(Collectors.groupingBy(HoodieWriteStat::getPartitionPath))
+ .values());
+
+ Map<String, Set<String>> fileGroupIdsToReplaceMap = (commitMetadata
instanceof HoodieReplaceCommitMetadata)
+ ? ((HoodieReplaceCommitMetadata)
commitMetadata).getPartitionToReplaceFileIds()
+ .entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e
-> new HashSet<>(e.getValue())))
+ : Collections.emptyMap();
+
+ String maxInstantTime =
HoodieMetadataWriteUtils.getMaxInstantTime(dataMetaClient, instantTime);
+ int parallelism = Math.max(Math.min(partitionedWriteStats.size(),
metadataConfig.getPartitionStatsIndexParallelism()), 1);
+ return engineContext.parallelize(partitionedWriteStats,
parallelism).mapToPair(partitionedWriteStat -> {
+ final String partitionName =
partitionedWriteStat.get(0).getPartitionPath();
checkState(tableMetadata != null, "tableMetadata should not be null
when scanning metadata table");
// Collect Column Metadata for Each File part of active file system
view of latest snapshot
// Get all file names, including log files, in a set from the file
slices
- // TODO(yihua): fix this
- Set<String> fileNames =
HoodieTableMetadataUtil.getPartitionLatestFileSlicesIncludingInflight(dataMetaClient,
Option.of(fileSystemView), partitionName).stream()
- .flatMap(fileSlice -> Stream.concat(
-
Stream.of(fileSlice.getBaseFile().map(HoodieBaseFile::getFileName).orElse(null)),
- fileSlice.getLogFiles().map(HoodieLogFile::getFileName)))
- .filter(Objects::nonNull)
- .collect(Collectors.toSet());
+ Set<String> fileNames =
HoodieMetadataWriteUtils.getFilesToFetchColumnStats(partitionedWriteStat,
dataMetaClient, tableMetadata, dataWriteConfig, partitionName, maxInstantTime,
instantTime,
Review Comment:
expr index is slightly different.
can you go back to the caller of this.
bcoz, we can't compute the expr index from within spark task, expr index
compuation is slightly different from partition stats index , where we compute
the col stats for individual files within a map call.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]