nsivabalan commented on code in PR #9223:
URL: https://github.com/apache/hudi/pull/9223#discussion_r1290863860
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -848,64 +851,49 @@ public static HoodieData<HoodieRecord>
convertFilesToBloomFilterRecords(HoodieEn
Map<String, Map<String, Long>> partitionToAppendedFiles,
MetadataRecordsGenerationParams recordsGenerationParams,
String instantTime) {
- HoodieData<HoodieRecord> allRecordsRDD = engineContext.emptyHoodieData();
-
- List<Pair<String, List<String>>> partitionToDeletedFilesList =
partitionToDeletedFiles.entrySet()
- .stream().map(e -> Pair.of(e.getKey(),
e.getValue())).collect(Collectors.toList());
- int parallelism = Math.max(Math.min(partitionToDeletedFilesList.size(),
recordsGenerationParams.getBloomIndexParallelism()), 1);
- HoodieData<Pair<String, List<String>>> partitionToDeletedFilesRDD =
engineContext.parallelize(partitionToDeletedFilesList, parallelism);
-
- HoodieData<HoodieRecord> deletedFilesRecordsRDD =
partitionToDeletedFilesRDD.flatMap(partitionToDeletedFilesPair -> {
- final String partitionName = partitionToDeletedFilesPair.getLeft();
- final List<String> deletedFileList =
partitionToDeletedFilesPair.getRight();
- return deletedFileList.stream().flatMap(deletedFile -> {
- if (!FSUtils.isBaseFile(new Path(deletedFile))) {
- return Stream.empty();
- }
-
- final String partition = getPartitionIdentifier(partitionName);
- return
Stream.<HoodieRecord>of(HoodieMetadataPayload.createBloomFilterMetadataRecord(
- partition, deletedFile, instantTime, StringUtils.EMPTY_STRING,
ByteBuffer.allocate(0), true));
- }).iterator();
- });
- allRecordsRDD = allRecordsRDD.union(deletedFilesRecordsRDD);
+ // Total number of files which are added or deleted
+ final int totalFiles =
partitionToDeletedFiles.values().stream().mapToInt(List::size).sum()
+ + partitionToAppendedFiles.values().stream().mapToInt(Map::size).sum();
+
+ // Create the tuple (partition, filename, isDeleted) to handle both
deletes and appends
+ final List<Tuple3<String, String, Boolean>> partitionFileFlagTupleList =
new ArrayList<>(totalFiles);
+ partitionToDeletedFiles.entrySet().stream()
+ .flatMap(entry -> entry.getValue().stream().map(deletedFile -> new
Tuple3<>(entry.getKey(), deletedFile, true)))
+ .collect(Collectors.toCollection(() -> partitionFileFlagTupleList));
+ partitionToAppendedFiles.entrySet().stream()
+ .flatMap(entry -> entry.getValue().keySet().stream().map(addedFile ->
new Tuple3<>(entry.getKey(), addedFile, false)))
+ .collect(Collectors.toCollection(() -> partitionFileFlagTupleList));
+
+ // Create records MDT
+ int parallelism = Math.max(Math.min(partitionFileFlagTupleList.size(),
recordsGenerationParams.getBloomIndexParallelism()), 1);
+ return engineContext.parallelize(partitionFileFlagTupleList,
parallelism).flatMap(partitionFileFlagTuple -> {
+ final String partitionName = partitionFileFlagTuple._1();
+ final String filename = partitionFileFlagTuple._2();
+ final boolean isDeleted = partitionFileFlagTuple._3();
+ if (!FSUtils.isBaseFile(new Path(filename))) {
+ LOG.warn(String.format("Ignoring file %s as it is not a base file",
filename));
+ return Stream.<HoodieRecord>empty().iterator();
+ }
- List<Pair<String, Map<String, Long>>> partitionToAppendedFilesList =
partitionToAppendedFiles.entrySet()
- .stream().map(entry -> Pair.of(entry.getKey(),
entry.getValue())).collect(Collectors.toList());
- parallelism = Math.max(Math.min(partitionToAppendedFilesList.size(),
recordsGenerationParams.getBloomIndexParallelism()), 1);
- HoodieData<Pair<String, Map<String, Long>>> partitionToAppendedFilesRDD =
engineContext.parallelize(partitionToAppendedFilesList, parallelism);
+ // Read the bloom filter from the base file if the file is being added
+ ByteBuffer bloomFilterBuffer = ByteBuffer.allocate(0);
+ if (!isDeleted) {
+ final String pathWithPartition = partitionName + "/" + filename;
+ final Path addedFilePath = new
Path(recordsGenerationParams.getDataMetaClient().getBasePath(),
pathWithPartition);
+ bloomFilterBuffer =
readBloomFilter(recordsGenerationParams.getDataMetaClient().getHadoopConf(),
addedFilePath);
+
+ // If reading the bloom filter failed then do not add a record for
this file
+ if (bloomFilterBuffer == null) {
+ LOG.error("Failed to read bloom filter from " + addedFilePath);
+ return Stream.<HoodieRecord>empty().iterator();
Review Comment:
https://issues.apache.org/jira/browse/HUDI-6684
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]