nsivabalan commented on code in PR #9223:
URL: https://github.com/apache/hudi/pull/9223#discussion_r1290863005


##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -848,64 +851,49 @@ public static HoodieData<HoodieRecord> 
convertFilesToBloomFilterRecords(HoodieEn
                                                                           
Map<String, Map<String, Long>> partitionToAppendedFiles,
                                                                           
MetadataRecordsGenerationParams recordsGenerationParams,
                                                                           
String instantTime) {
-    HoodieData<HoodieRecord> allRecordsRDD = engineContext.emptyHoodieData();
-
-    List<Pair<String, List<String>>> partitionToDeletedFilesList = 
partitionToDeletedFiles.entrySet()
-        .stream().map(e -> Pair.of(e.getKey(), 
e.getValue())).collect(Collectors.toList());
-    int parallelism = Math.max(Math.min(partitionToDeletedFilesList.size(), 
recordsGenerationParams.getBloomIndexParallelism()), 1);
-    HoodieData<Pair<String, List<String>>> partitionToDeletedFilesRDD = 
engineContext.parallelize(partitionToDeletedFilesList, parallelism);
-
-    HoodieData<HoodieRecord> deletedFilesRecordsRDD = 
partitionToDeletedFilesRDD.flatMap(partitionToDeletedFilesPair -> {
-      final String partitionName = partitionToDeletedFilesPair.getLeft();
-      final List<String> deletedFileList = 
partitionToDeletedFilesPair.getRight();
-      return deletedFileList.stream().flatMap(deletedFile -> {
-        if (!FSUtils.isBaseFile(new Path(deletedFile))) {
-          return Stream.empty();
-        }
-
-        final String partition = getPartitionIdentifier(partitionName);
-        return 
Stream.<HoodieRecord>of(HoodieMetadataPayload.createBloomFilterMetadataRecord(
-            partition, deletedFile, instantTime, StringUtils.EMPTY_STRING, 
ByteBuffer.allocate(0), true));
-      }).iterator();
-    });
-    allRecordsRDD = allRecordsRDD.union(deletedFilesRecordsRDD);
+    // Total number of files which are added or deleted
+    final int totalFiles = 
partitionToDeletedFiles.values().stream().mapToInt(List::size).sum()
+        + partitionToAppendedFiles.values().stream().mapToInt(Map::size).sum();
+
+    // Create the tuple (partition, filename, isDeleted) to handle both 
deletes and appends
+    final List<Tuple3<String, String, Boolean>> partitionFileFlagTupleList = 
new ArrayList<>(totalFiles);
+    partitionToDeletedFiles.entrySet().stream()
+        .flatMap(entry -> entry.getValue().stream().map(deletedFile -> new 
Tuple3<>(entry.getKey(), deletedFile, true)))
+        .collect(Collectors.toCollection(() -> partitionFileFlagTupleList));
+    partitionToAppendedFiles.entrySet().stream()
+        .flatMap(entry -> entry.getValue().keySet().stream().map(addedFile -> 
new Tuple3<>(entry.getKey(), addedFile, false)))
+        .collect(Collectors.toCollection(() -> partitionFileFlagTupleList));

Review Comment:
   there are some minor difference b/w col stats and bloom filter wrt log file 
handling. So, may be we can leave it as is. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to