nsivabalan commented on code in PR #9223:
URL: https://github.com/apache/hudi/pull/9223#discussion_r1290861309
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -915,65 +903,60 @@ public static HoodieData<HoodieRecord>
convertFilesToColumnStatsRecords(HoodieEn
Map<String, List<String>> partitionToDeletedFiles,
Map<String, Map<String, Long>> partitionToAppendedFiles,
MetadataRecordsGenerationParams recordsGenerationParams) {
- HoodieData<HoodieRecord> allRecordsRDD = engineContext.emptyHoodieData();
+ // Find the columns to index
HoodieTableMetaClient dataTableMetaClient =
recordsGenerationParams.getDataMetaClient();
-
final List<String> columnsToIndex =
getColumnsToIndex(recordsGenerationParams,
Lazy.lazily(() -> tryResolveSchemaForTable(dataTableMetaClient)));
-
if (columnsToIndex.isEmpty()) {
// In case there are no columns to index, bail
return engineContext.emptyHoodieData();
}
- final List<Pair<String, List<String>>> partitionToDeletedFilesList =
partitionToDeletedFiles.entrySet().stream()
- .map(e -> Pair.of(e.getKey(), e.getValue()))
- .collect(Collectors.toList());
-
- int deletedFilesTargetParallelism =
Math.max(Math.min(partitionToDeletedFilesList.size(),
recordsGenerationParams.getColumnStatsIndexParallelism()), 1);
- final HoodieData<Pair<String, List<String>>> partitionToDeletedFilesRDD =
- engineContext.parallelize(partitionToDeletedFilesList,
deletedFilesTargetParallelism);
-
- HoodieData<HoodieRecord> deletedFilesRecordsRDD =
partitionToDeletedFilesRDD.flatMap(partitionToDeletedFilesPair -> {
- final String partitionPath = partitionToDeletedFilesPair.getLeft();
- final String partitionId = getPartitionIdentifier(partitionPath);
- final List<String> deletedFileList =
partitionToDeletedFilesPair.getRight();
-
- return deletedFileList.stream().flatMap(deletedFile -> {
- final String filePathWithPartition = partitionPath + "/" + deletedFile;
- return getColumnStatsRecords(partitionId, filePathWithPartition,
dataTableMetaClient, columnsToIndex, true);
- }).iterator();
- });
-
- allRecordsRDD = allRecordsRDD.union(deletedFilesRecordsRDD);
-
- final List<Pair<String, Map<String, Long>>> partitionToAppendedFilesList =
partitionToAppendedFiles.entrySet().stream()
- .map(entry -> Pair.of(entry.getKey(), entry.getValue()))
- .collect(Collectors.toList());
-
- int appendedFilesTargetParallelism =
Math.max(Math.min(partitionToAppendedFilesList.size(),
recordsGenerationParams.getColumnStatsIndexParallelism()), 1);
- final HoodieData<Pair<String, Map<String, Long>>>
partitionToAppendedFilesRDD =
- engineContext.parallelize(partitionToAppendedFilesList,
appendedFilesTargetParallelism);
-
- HoodieData<HoodieRecord> appendedFilesRecordsRDD =
partitionToAppendedFilesRDD.flatMap(partitionToAppendedFilesPair -> {
- final String partitionPath = partitionToAppendedFilesPair.getLeft();
- final String partitionId = getPartitionIdentifier(partitionPath);
- final Map<String, Long> appendedFileMap =
partitionToAppendedFilesPair.getRight();
+ LOG.info(String.format("Indexing %d columns for column stats index",
columnsToIndex.size()));
+
+ // Total number of files which are added or deleted
+ final int totalFiles =
partitionToDeletedFiles.values().stream().mapToInt(List::size).sum()
+ + partitionToAppendedFiles.values().stream().mapToInt(Map::size).sum();
+
+ // Create the tuple (partition, filename, isDeleted) to handle both
deletes and appends
+ final List<Tuple3<String, String, Boolean>> partitionFileFlagTupleList =
new ArrayList<>(totalFiles);
Review Comment:
we do N * M. where N = columns to index. and M = tuple (partition, filename,
isDeleted). So, we don't need it here. you can check this method
getColumnStatsRecords(partitionId, filePathWithPartition,
dataTableMetaClient, columnsToIndex, isDeleted).iterator();
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]