codope commented on code in PR #9223:
URL: https://github.com/apache/hudi/pull/9223#discussion_r1267505437
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -405,6 +405,9 @@ private boolean initializeFromFilesystem(String
initializationTime, List<Metadat
+ " bootstrap failed for " + metadataMetaClient.getBasePath(), e);
}
+ LOG.info(String.format("Initializing %s index with %d mappings and %d
file groups.", partitionType.name(), fileGroupCountAndRecordsPair.getKey(),
Review Comment:
Is `info` level necessary? If we are thinking of logging, how about starting
a timer and add another log after initialization is complete?
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -848,64 +851,49 @@ public static HoodieData<HoodieRecord>
convertFilesToBloomFilterRecords(HoodieEn
Map<String, Map<String, Long>> partitionToAppendedFiles,
MetadataRecordsGenerationParams recordsGenerationParams,
String instantTime) {
- HoodieData<HoodieRecord> allRecordsRDD = engineContext.emptyHoodieData();
-
- List<Pair<String, List<String>>> partitionToDeletedFilesList =
partitionToDeletedFiles.entrySet()
- .stream().map(e -> Pair.of(e.getKey(),
e.getValue())).collect(Collectors.toList());
- int parallelism = Math.max(Math.min(partitionToDeletedFilesList.size(),
recordsGenerationParams.getBloomIndexParallelism()), 1);
- HoodieData<Pair<String, List<String>>> partitionToDeletedFilesRDD =
engineContext.parallelize(partitionToDeletedFilesList, parallelism);
-
- HoodieData<HoodieRecord> deletedFilesRecordsRDD =
partitionToDeletedFilesRDD.flatMap(partitionToDeletedFilesPair -> {
- final String partitionName = partitionToDeletedFilesPair.getLeft();
- final List<String> deletedFileList =
partitionToDeletedFilesPair.getRight();
- return deletedFileList.stream().flatMap(deletedFile -> {
- if (!FSUtils.isBaseFile(new Path(deletedFile))) {
- return Stream.empty();
- }
-
- final String partition = getPartitionIdentifier(partitionName);
- return
Stream.<HoodieRecord>of(HoodieMetadataPayload.createBloomFilterMetadataRecord(
- partition, deletedFile, instantTime, StringUtils.EMPTY_STRING,
ByteBuffer.allocate(0), true));
- }).iterator();
- });
- allRecordsRDD = allRecordsRDD.union(deletedFilesRecordsRDD);
+ // Total number of files which are added or deleted
+ final int totalFiles =
partitionToDeletedFiles.values().stream().mapToInt(List::size).sum()
+ + partitionToAppendedFiles.values().stream().mapToInt(Map::size).sum();
+
+ // Create the tuple (partition, filename, isDeleted) to handle both
deletes and appends
+ final List<Tuple3<String, String, Boolean>> partitionFileFlagTupleList =
new ArrayList<>(totalFiles);
+ partitionToDeletedFiles.entrySet().stream()
+ .flatMap(entry -> entry.getValue().stream().map(deletedFile -> new
Tuple3<>(entry.getKey(), deletedFile, true)))
+ .collect(Collectors.toCollection(() -> partitionFileFlagTupleList));
+ partitionToAppendedFiles.entrySet().stream()
+ .flatMap(entry -> entry.getValue().keySet().stream().map(addedFile ->
new Tuple3<>(entry.getKey(), addedFile, false)))
+ .collect(Collectors.toCollection(() -> partitionFileFlagTupleList));
Review Comment:
We can probably extract this tuple creation code to a separate method. Looks
repetitive for both bloom filter and colstats.
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -915,65 +903,60 @@ public static HoodieData<HoodieRecord>
convertFilesToColumnStatsRecords(HoodieEn
Map<String, List<String>> partitionToDeletedFiles,
Map<String, Map<String, Long>> partitionToAppendedFiles,
MetadataRecordsGenerationParams recordsGenerationParams) {
- HoodieData<HoodieRecord> allRecordsRDD = engineContext.emptyHoodieData();
+ // Find the columns to index
HoodieTableMetaClient dataTableMetaClient =
recordsGenerationParams.getDataMetaClient();
-
final List<String> columnsToIndex =
getColumnsToIndex(recordsGenerationParams,
Lazy.lazily(() -> tryResolveSchemaForTable(dataTableMetaClient)));
-
if (columnsToIndex.isEmpty()) {
// In case there are no columns to index, bail
return engineContext.emptyHoodieData();
}
- final List<Pair<String, List<String>>> partitionToDeletedFilesList =
partitionToDeletedFiles.entrySet().stream()
- .map(e -> Pair.of(e.getKey(), e.getValue()))
- .collect(Collectors.toList());
-
- int deletedFilesTargetParallelism =
Math.max(Math.min(partitionToDeletedFilesList.size(),
recordsGenerationParams.getColumnStatsIndexParallelism()), 1);
- final HoodieData<Pair<String, List<String>>> partitionToDeletedFilesRDD =
- engineContext.parallelize(partitionToDeletedFilesList,
deletedFilesTargetParallelism);
-
- HoodieData<HoodieRecord> deletedFilesRecordsRDD =
partitionToDeletedFilesRDD.flatMap(partitionToDeletedFilesPair -> {
- final String partitionPath = partitionToDeletedFilesPair.getLeft();
- final String partitionId = getPartitionIdentifier(partitionPath);
- final List<String> deletedFileList =
partitionToDeletedFilesPair.getRight();
-
- return deletedFileList.stream().flatMap(deletedFile -> {
- final String filePathWithPartition = partitionPath + "/" + deletedFile;
- return getColumnStatsRecords(partitionId, filePathWithPartition,
dataTableMetaClient, columnsToIndex, true);
- }).iterator();
- });
-
- allRecordsRDD = allRecordsRDD.union(deletedFilesRecordsRDD);
-
- final List<Pair<String, Map<String, Long>>> partitionToAppendedFilesList =
partitionToAppendedFiles.entrySet().stream()
- .map(entry -> Pair.of(entry.getKey(), entry.getValue()))
- .collect(Collectors.toList());
-
- int appendedFilesTargetParallelism =
Math.max(Math.min(partitionToAppendedFilesList.size(),
recordsGenerationParams.getColumnStatsIndexParallelism()), 1);
- final HoodieData<Pair<String, Map<String, Long>>>
partitionToAppendedFilesRDD =
- engineContext.parallelize(partitionToAppendedFilesList,
appendedFilesTargetParallelism);
-
- HoodieData<HoodieRecord> appendedFilesRecordsRDD =
partitionToAppendedFilesRDD.flatMap(partitionToAppendedFilesPair -> {
- final String partitionPath = partitionToAppendedFilesPair.getLeft();
- final String partitionId = getPartitionIdentifier(partitionPath);
- final Map<String, Long> appendedFileMap =
partitionToAppendedFilesPair.getRight();
+ LOG.info(String.format("Indexing %d columns for column stats index",
columnsToIndex.size()));
+
+ // Total number of files which are added or deleted
+ final int totalFiles =
partitionToDeletedFiles.values().stream().mapToInt(List::size).sum()
+ + partitionToAppendedFiles.values().stream().mapToInt(Map::size).sum();
+
+ // Create the tuple (partition, filename, isDeleted) to handle both
deletes and appends
+ final List<Tuple3<String, String, Boolean>> partitionFileFlagTupleList =
new ArrayList<>(totalFiles);
Review Comment:
Don't we need column name too?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]