codope commented on a change in pull request #4746:
URL: https://github.com/apache/hudi/pull/4746#discussion_r800530826



##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -667,34 +660,60 @@ private static void 
processRollbackMetadata(HoodieActiveTimeline metadataTableTi
   }
 
   /**
-   * Convert rollback action metadata to column stats index records.
+   * Convert added and deleted action metadata to column stats index records.
    */
-  private static List<HoodieRecord> 
convertFilesToColumnStatsRecords(HoodieEngineContext engineContext,
-                                                                     
HoodieTableMetaClient datasetMetaClient,
-                                                                     
Map<String, List<String>> partitionToDeletedFiles,
-                                                                     
Map<String, Map<String, Long>> partitionToAppendedFiles,
-                                                                     String 
instantTime) {
-    List<HoodieRecord> records = new LinkedList<>();
-    List<String> latestColumns = getLatestColumns(datasetMetaClient);
-    partitionToDeletedFiles.forEach((partitionName, deletedFileList) -> 
deletedFileList.forEach(deletedFile -> {
+  public static HoodieData<HoodieRecord> 
convertFilesToColumnStatsRecords(HoodieEngineContext engineContext,
+                                                                          
Map<String, List<String>> partitionToDeletedFiles,
+                                                                          
Map<String, Map<String, Long>> partitionToAppendedFiles,
+                                                                          
MetadataRecordsGenerationParams recordsGenerationParams) {
+    HoodieData<HoodieRecord> allRecordsRDD = engineContext.emptyHoodieData();
+    final List<String> latestColumns = 
getLatestColumns(recordsGenerationParams.getDataMetaClient());
+
+    final List<Pair<String, List<String>>> partitionToDeletedFilesList = 
partitionToDeletedFiles.entrySet()
+        .stream().map(e -> Pair.of(e.getKey(), 
e.getValue())).collect(Collectors.toList());
+    final HoodieData<Pair<String, List<String>>> partitionToDeletedFilesRDD = 
engineContext.parallelize(partitionToDeletedFilesList,
+        Math.max(partitionToDeletedFilesList.size(), 
recordsGenerationParams.getBloomIndexParallelism()));
+
+    HoodieData<HoodieRecord> deletedFilesRecordsRDD = 
partitionToDeletedFilesRDD.flatMap(partitionToDeletedFilesEntry -> {
+      final String partitionName = partitionToDeletedFilesEntry.getLeft();
       final String partition = partitionName.equals(EMPTY_PARTITION_NAME) ? 
NON_PARTITIONED_NAME : partitionName;
-      if (deletedFile.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
+      final List<String> deletedFileList = 
partitionToDeletedFilesEntry.getRight();
+
+      return deletedFileList.stream().flatMap(deletedFile -> {
+        if (!FSUtils.isBaseFile(new Path(deletedFile)) || 
!deletedFile.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
+          return Stream.empty();
+        }
         final String filePathWithPartition = partitionName + "/" + deletedFile;
-        records.addAll(getColumnStats(partition, filePathWithPartition, 
datasetMetaClient,
-            latestColumns, true).collect(Collectors.toList()));
-      }
-    }));
-
-    partitionToAppendedFiles.forEach((partitionName, appendedFileMap) -> 
appendedFileMap.forEach(
-        (appendedFile, size) -> {
-          final String partition = partitionName.equals(EMPTY_PARTITION_NAME) 
? NON_PARTITIONED_NAME : partitionName;
-          if 
(appendedFile.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
-            final String filePathWithPartition = partitionName + "/" + 
appendedFile;
-            records.addAll(getColumnStats(partition, filePathWithPartition, 
datasetMetaClient,
-                latestColumns, false).collect(Collectors.toList()));
-          }
-        }));
-    return records;
+        return getColumnStats(partition, filePathWithPartition, 
recordsGenerationParams.getDataMetaClient(),
+            latestColumns, true);
+      }).iterator();
+    });
+    allRecordsRDD = allRecordsRDD.union(deletedFilesRecordsRDD);
+
+    final List<Pair<String, Map<String, Long>>> partitionToAppendedFilesList = 
partitionToAppendedFiles.entrySet()
+        .stream().map(entry -> Pair.of(entry.getKey(), 
entry.getValue())).collect(Collectors.toList());
+    final HoodieData<Pair<String, Map<String, Long>>> 
partitionToAppendedFilesRDD = 
engineContext.parallelize(partitionToAppendedFilesList,
+        Math.max(partitionToAppendedFiles.size(), 
recordsGenerationParams.getBloomIndexParallelism()));
+
+    HoodieData<HoodieRecord> appendedFilesRecordsRDD = 
partitionToAppendedFilesRDD.flatMap(partitionToAppendedFilesEntry -> {
+      final String partitionName = partitionToAppendedFilesEntry.getLeft();
+      final String partition = partitionName.equals(EMPTY_PARTITION_NAME) ? 
NON_PARTITIONED_NAME : partitionName;
+      final Map<String, Long> appendedFileMap = 
partitionToAppendedFilesEntry.getRight();
+
+      return 
appendedFileMap.entrySet().stream().flatMap(appendedFileNameLengthPair -> {
+        if (!FSUtils.isBaseFile(new Path(appendedFileNameLengthPair.getKey()))
+            || 
!appendedFileNameLengthPair.getKey().endsWith(HoodieFileFormat.PARQUET.getFileExtension()))
 {

Review comment:
       I think we should try to generalize this if-condition to a method. I 
know right now we just get stats from base parquet file but it might change 
tomorrow.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to