This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new bce55f0c165 [HUDI-6300] Fix file size parallelism not work when init 
metadata table (#8856)
bce55f0c165 is described below

commit bce55f0c1651949a1dfddaaf343d62cf76574063
Author: KnightChess <[email protected]>
AuthorDate: Wed Jul 19 10:26:13 2023 +0800

    [HUDI-6300] Fix file size parallelism not work when init metadata table 
(#8856)
    
    Co-authored-by: Y Ethan Guo <[email protected]>
---
 .../hudi/metadata/HoodieTableMetadataUtil.java     | 140 ++++++++++-----------
 1 file changed, 66 insertions(+), 74 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index cd87f6ff59c..56f478e781c 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -86,6 +86,7 @@ import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.function.BiFunction;
 import java.util.function.Function;
@@ -850,59 +851,56 @@ public class HoodieTableMetadataUtil {
                                                                           
String instantTime) {
     HoodieData<HoodieRecord> allRecordsRDD = engineContext.emptyHoodieData();
 
-    List<Pair<String, List<String>>> partitionToDeletedFilesList = 
partitionToDeletedFiles.entrySet()
-        .stream().map(e -> Pair.of(e.getKey(), 
e.getValue())).collect(Collectors.toList());
-    int parallelism = Math.max(Math.min(partitionToDeletedFilesList.size(), 
recordsGenerationParams.getBloomIndexParallelism()), 1);
-    HoodieData<Pair<String, List<String>>> partitionToDeletedFilesRDD = 
engineContext.parallelize(partitionToDeletedFilesList, parallelism);
+    List<Pair<String, String>> partitionToDeletedFilesList = 
partitionToDeletedFiles.entrySet().stream().flatMap(entry -> {
+      return entry.getValue().stream().map(file -> Pair.of(entry.getKey(), 
file));
+    }).collect(Collectors.toList());
 
-    HoodieData<HoodieRecord> deletedFilesRecordsRDD = 
partitionToDeletedFilesRDD.flatMap(partitionToDeletedFilesPair -> {
-      final String partitionName = partitionToDeletedFilesPair.getLeft();
-      final List<String> deletedFileList = 
partitionToDeletedFilesPair.getRight();
-      return deletedFileList.stream().flatMap(deletedFile -> {
-        if (!FSUtils.isBaseFile(new Path(deletedFile))) {
-          return Stream.empty();
-        }
+    int parallelism = Math.max(Math.min(partitionToDeletedFilesList.size(), 
recordsGenerationParams.getBloomIndexParallelism()), 1);
+    HoodieData<Pair<String, String>> partitionToDeletedFilesRDD = 
engineContext.parallelize(partitionToDeletedFilesList, parallelism);
 
-        final String partition = getPartitionIdentifier(partitionName);
-        return 
Stream.<HoodieRecord>of(HoodieMetadataPayload.createBloomFilterMetadataRecord(
-            partition, deletedFile, instantTime, StringUtils.EMPTY_STRING, 
ByteBuffer.allocate(0), true));
-      }).iterator();
-    });
+    HoodieData<HoodieRecord> deletedFilesRecordsRDD = 
partitionToDeletedFilesRDD.map(partitionToDeletedFilePair -> {
+      String partitionName = partitionToDeletedFilePair.getLeft();
+      String deletedFile = partitionToDeletedFilePair.getRight();
+      if (!FSUtils.isBaseFile(new Path(deletedFile))) {
+        return null;
+      }
+      final String partition = getPartitionIdentifier(partitionName);
+      return (HoodieRecord) 
(HoodieMetadataPayload.createBloomFilterMetadataRecord(
+          partition, deletedFile, instantTime, StringUtils.EMPTY_STRING, 
ByteBuffer.allocate(0), true));
+    }).filter(Objects::nonNull);
     allRecordsRDD = allRecordsRDD.union(deletedFilesRecordsRDD);
 
-    List<Pair<String, Map<String, Long>>> partitionToAppendedFilesList = 
partitionToAppendedFiles.entrySet()
-        .stream().map(entry -> Pair.of(entry.getKey(), 
entry.getValue())).collect(Collectors.toList());
+    List<Pair<String, String>> partitionToAppendedFilesList = 
partitionToAppendedFiles.entrySet().stream().flatMap(entry -> {
+      return entry.getValue().keySet().stream().map(file -> 
Pair.of(entry.getKey(), file));
+    }).collect(Collectors.toList());
+
     parallelism = Math.max(Math.min(partitionToAppendedFilesList.size(), 
recordsGenerationParams.getBloomIndexParallelism()), 1);
-    HoodieData<Pair<String, Map<String, Long>>> partitionToAppendedFilesRDD = 
engineContext.parallelize(partitionToAppendedFilesList, parallelism);
+    HoodieData<Pair<String, String>> partitionToAppendedFilesRDD = 
engineContext.parallelize(partitionToAppendedFilesList, parallelism);
 
-    HoodieData<HoodieRecord> appendedFilesRecordsRDD = 
partitionToAppendedFilesRDD.flatMap(partitionToAppendedFilesPair -> {
-      final String partitionName = partitionToAppendedFilesPair.getLeft();
-      final Map<String, Long> appendedFileMap = 
partitionToAppendedFilesPair.getRight();
-      final String partition = getPartitionIdentifier(partitionName);
-      return 
appendedFileMap.entrySet().stream().flatMap(appendedFileLengthPairEntry -> {
-        final String appendedFile = appendedFileLengthPairEntry.getKey();
-        if (!FSUtils.isBaseFile(new Path(appendedFile))) {
-          return Stream.empty();
-        }
-        final String pathWithPartition = partitionName + "/" + appendedFile;
-        final Path appendedFilePath = new 
Path(recordsGenerationParams.getDataMetaClient().getBasePath(), 
pathWithPartition);
-        try (HoodieFileReader fileReader =
-                 
HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO).getFileReader(recordsGenerationParams.getDataMetaClient().getHadoopConf(),
 appendedFilePath)) {
-          final BloomFilter fileBloomFilter = fileReader.readBloomFilter();
-          if (fileBloomFilter == null) {
-            LOG.error("Failed to read bloom filter for " + appendedFilePath);
-            return Stream.empty();
-          }
-          ByteBuffer bloomByteBuffer = 
ByteBuffer.wrap(fileBloomFilter.serializeToString().getBytes());
-          HoodieRecord record = 
HoodieMetadataPayload.createBloomFilterMetadataRecord(
-              partition, appendedFile, instantTime, 
recordsGenerationParams.getBloomFilterType(), bloomByteBuffer, false);
-          return Stream.of(record);
-        } catch (IOException e) {
-          LOG.error("Failed to get bloom filter for file: " + 
appendedFilePath);
+    HoodieData<HoodieRecord> appendedFilesRecordsRDD = 
partitionToAppendedFilesRDD.map(partitionToAppendedFilesPair -> {
+      String partitionName = partitionToAppendedFilesPair.getLeft();
+      String appendedFile = partitionToAppendedFilesPair.getRight();
+      String partition = getPartitionIdentifier(partitionName);
+      if (!FSUtils.isBaseFile(new Path(appendedFile))) {
+        return null;
+      }
+      final String pathWithPartition = partitionName + "/" + appendedFile;
+      final Path appendedFilePath = new 
Path(recordsGenerationParams.getDataMetaClient().getBasePath(), 
pathWithPartition);
+      try (HoodieFileReader fileReader =
+               
HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO).getFileReader(recordsGenerationParams.getDataMetaClient().getHadoopConf(),
 appendedFilePath)) {
+        final BloomFilter fileBloomFilter = fileReader.readBloomFilter();
+        if (fileBloomFilter == null) {
+          LOG.error("Failed to read bloom filter for " + appendedFilePath);
+          return null;
         }
-        return Stream.empty();
-      }).iterator();
-    });
+        ByteBuffer bloomByteBuffer = 
ByteBuffer.wrap(fileBloomFilter.serializeToString().getBytes());
+        return (HoodieRecord) 
(HoodieMetadataPayload.createBloomFilterMetadataRecord(
+            partition, appendedFile, instantTime, 
recordsGenerationParams.getBloomFilterType(), bloomByteBuffer, false));
+      } catch (IOException e) {
+        LOG.error("Failed to get bloom filter for file: " + appendedFilePath);
+      }
+      return null;
+    }).filter(Objects::nonNull);
     allRecordsRDD = allRecordsRDD.union(appendedFilesRecordsRDD);
 
     return allRecordsRDD;
@@ -927,48 +925,42 @@ public class HoodieTableMetadataUtil {
       return engineContext.emptyHoodieData();
     }
 
-    final List<Pair<String, List<String>>> partitionToDeletedFilesList = 
partitionToDeletedFiles.entrySet().stream()
-        .map(e -> Pair.of(e.getKey(), e.getValue()))
-        .collect(Collectors.toList());
+    List<Pair<String, String>> partitionToDeletedFilesList = 
partitionToDeletedFiles.entrySet().stream().flatMap(entry -> {
+      return entry.getValue().stream().map(file -> Pair.of(entry.getKey(), 
file));
+    }).collect(Collectors.toList());
 
     int deletedFilesTargetParallelism = 
Math.max(Math.min(partitionToDeletedFilesList.size(), 
recordsGenerationParams.getColumnStatsIndexParallelism()), 1);
-    final HoodieData<Pair<String, List<String>>> partitionToDeletedFilesRDD =
+    final HoodieData<Pair<String, String>> partitionToDeletedFilesRDD =
         engineContext.parallelize(partitionToDeletedFilesList, 
deletedFilesTargetParallelism);
 
     HoodieData<HoodieRecord> deletedFilesRecordsRDD = 
partitionToDeletedFilesRDD.flatMap(partitionToDeletedFilesPair -> {
-      final String partitionPath = partitionToDeletedFilesPair.getLeft();
-      final String partitionId = getPartitionIdentifier(partitionPath);
-      final List<String> deletedFileList = 
partitionToDeletedFilesPair.getRight();
-
-      return deletedFileList.stream().flatMap(deletedFile -> {
-        final String filePathWithPartition = partitionPath + "/" + deletedFile;
-        return getColumnStatsRecords(partitionId, filePathWithPartition, 
dataTableMetaClient, columnsToIndex, true);
-      }).iterator();
+      String partitionPath = partitionToDeletedFilesPair.getLeft();
+      String partitionId = getPartitionIdentifier(partitionPath);
+      String deletedFile = partitionToDeletedFilesPair.getRight();
+      String filePathWithPartition = partitionPath + "/" + deletedFile;
+      return getColumnStatsRecords(partitionId, filePathWithPartition, 
dataTableMetaClient, columnsToIndex, true).iterator();
     });
 
     allRecordsRDD = allRecordsRDD.union(deletedFilesRecordsRDD);
 
-    final List<Pair<String, Map<String, Long>>> partitionToAppendedFilesList = 
partitionToAppendedFiles.entrySet().stream()
-        .map(entry -> Pair.of(entry.getKey(), entry.getValue()))
-        .collect(Collectors.toList());
+    List<Pair<String, String>> partitionToAppendedFilesList = 
partitionToAppendedFiles.entrySet().stream().flatMap(entry -> {
+      return entry.getValue().keySet().stream().map(file -> 
Pair.of(entry.getKey(), file));
+    }).collect(Collectors.toList());
 
     int appendedFilesTargetParallelism = 
Math.max(Math.min(partitionToAppendedFilesList.size(), 
recordsGenerationParams.getColumnStatsIndexParallelism()), 1);
-    final HoodieData<Pair<String, Map<String, Long>>> 
partitionToAppendedFilesRDD =
+    final HoodieData<Pair<String, String>> partitionToAppendedFilesRDD =
         engineContext.parallelize(partitionToAppendedFilesList, 
appendedFilesTargetParallelism);
 
     HoodieData<HoodieRecord> appendedFilesRecordsRDD = 
partitionToAppendedFilesRDD.flatMap(partitionToAppendedFilesPair -> {
-      final String partitionPath = partitionToAppendedFilesPair.getLeft();
-      final String partitionId = getPartitionIdentifier(partitionPath);
-      final Map<String, Long> appendedFileMap = 
partitionToAppendedFilesPair.getRight();
-
-      return 
appendedFileMap.entrySet().stream().flatMap(appendedFileNameLengthEntry -> {
-        if (!FSUtils.isBaseFile(new Path(appendedFileNameLengthEntry.getKey()))
-            || 
!appendedFileNameLengthEntry.getKey().endsWith(HoodieFileFormat.PARQUET.getFileExtension()))
 {
-          return Stream.empty();
-        }
-        final String filePathWithPartition = partitionPath + "/" + 
appendedFileNameLengthEntry.getKey();
-        return getColumnStatsRecords(partitionId, filePathWithPartition, 
dataTableMetaClient, columnsToIndex, false);
-      }).iterator();
+      String partitionPath = partitionToAppendedFilesPair.getLeft();
+      String partitionId = getPartitionIdentifier(partitionPath);
+      String appendedFile = partitionToAppendedFilesPair.getRight();
+      if (!FSUtils.isBaseFile(new Path(appendedFile))
+          || 
!appendedFile.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
+        return Stream.<HoodieRecord>empty().iterator();
+      }
+      final String filePathWithPartition = partitionPath + "/" + appendedFile;
+      return getColumnStatsRecords(partitionId, filePathWithPartition, 
dataTableMetaClient, columnsToIndex, false).iterator();
     });
 
     allRecordsRDD = allRecordsRDD.union(appendedFilesRecordsRDD);

Reply via email to