This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new bce55f0c165 [HUDI-6300] Fix file size parallelism not work when init
metadata table (#8856)
bce55f0c165 is described below
commit bce55f0c1651949a1dfddaaf343d62cf76574063
Author: KnightChess <[email protected]>
AuthorDate: Wed Jul 19 10:26:13 2023 +0800
[HUDI-6300] Fix file size parallelism not work when init metadata table
(#8856)
Co-authored-by: Y Ethan Guo <[email protected]>
---
.../hudi/metadata/HoodieTableMetadataUtil.java | 140 ++++++++++-----------
1 file changed, 66 insertions(+), 74 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index cd87f6ff59c..56f478e781c 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -86,6 +86,7 @@ import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.Function;
@@ -850,59 +851,56 @@ public class HoodieTableMetadataUtil {
String instantTime) {
HoodieData<HoodieRecord> allRecordsRDD = engineContext.emptyHoodieData();
- List<Pair<String, List<String>>> partitionToDeletedFilesList =
partitionToDeletedFiles.entrySet()
- .stream().map(e -> Pair.of(e.getKey(),
e.getValue())).collect(Collectors.toList());
- int parallelism = Math.max(Math.min(partitionToDeletedFilesList.size(),
recordsGenerationParams.getBloomIndexParallelism()), 1);
- HoodieData<Pair<String, List<String>>> partitionToDeletedFilesRDD =
engineContext.parallelize(partitionToDeletedFilesList, parallelism);
+ List<Pair<String, String>> partitionToDeletedFilesList =
partitionToDeletedFiles.entrySet().stream().flatMap(entry -> {
+ return entry.getValue().stream().map(file -> Pair.of(entry.getKey(),
file));
+ }).collect(Collectors.toList());
- HoodieData<HoodieRecord> deletedFilesRecordsRDD =
partitionToDeletedFilesRDD.flatMap(partitionToDeletedFilesPair -> {
- final String partitionName = partitionToDeletedFilesPair.getLeft();
- final List<String> deletedFileList =
partitionToDeletedFilesPair.getRight();
- return deletedFileList.stream().flatMap(deletedFile -> {
- if (!FSUtils.isBaseFile(new Path(deletedFile))) {
- return Stream.empty();
- }
+ int parallelism = Math.max(Math.min(partitionToDeletedFilesList.size(),
recordsGenerationParams.getBloomIndexParallelism()), 1);
+ HoodieData<Pair<String, String>> partitionToDeletedFilesRDD =
engineContext.parallelize(partitionToDeletedFilesList, parallelism);
- final String partition = getPartitionIdentifier(partitionName);
- return
Stream.<HoodieRecord>of(HoodieMetadataPayload.createBloomFilterMetadataRecord(
- partition, deletedFile, instantTime, StringUtils.EMPTY_STRING,
ByteBuffer.allocate(0), true));
- }).iterator();
- });
+ HoodieData<HoodieRecord> deletedFilesRecordsRDD =
partitionToDeletedFilesRDD.map(partitionToDeletedFilePair -> {
+ String partitionName = partitionToDeletedFilePair.getLeft();
+ String deletedFile = partitionToDeletedFilePair.getRight();
+ if (!FSUtils.isBaseFile(new Path(deletedFile))) {
+ return null;
+ }
+ final String partition = getPartitionIdentifier(partitionName);
+ return (HoodieRecord)
(HoodieMetadataPayload.createBloomFilterMetadataRecord(
+ partition, deletedFile, instantTime, StringUtils.EMPTY_STRING,
ByteBuffer.allocate(0), true));
+ }).filter(Objects::nonNull);
allRecordsRDD = allRecordsRDD.union(deletedFilesRecordsRDD);
- List<Pair<String, Map<String, Long>>> partitionToAppendedFilesList =
partitionToAppendedFiles.entrySet()
- .stream().map(entry -> Pair.of(entry.getKey(),
entry.getValue())).collect(Collectors.toList());
+ List<Pair<String, String>> partitionToAppendedFilesList =
partitionToAppendedFiles.entrySet().stream().flatMap(entry -> {
+ return entry.getValue().keySet().stream().map(file ->
Pair.of(entry.getKey(), file));
+ }).collect(Collectors.toList());
+
parallelism = Math.max(Math.min(partitionToAppendedFilesList.size(),
recordsGenerationParams.getBloomIndexParallelism()), 1);
- HoodieData<Pair<String, Map<String, Long>>> partitionToAppendedFilesRDD =
engineContext.parallelize(partitionToAppendedFilesList, parallelism);
+ HoodieData<Pair<String, String>> partitionToAppendedFilesRDD =
engineContext.parallelize(partitionToAppendedFilesList, parallelism);
- HoodieData<HoodieRecord> appendedFilesRecordsRDD =
partitionToAppendedFilesRDD.flatMap(partitionToAppendedFilesPair -> {
- final String partitionName = partitionToAppendedFilesPair.getLeft();
- final Map<String, Long> appendedFileMap =
partitionToAppendedFilesPair.getRight();
- final String partition = getPartitionIdentifier(partitionName);
- return
appendedFileMap.entrySet().stream().flatMap(appendedFileLengthPairEntry -> {
- final String appendedFile = appendedFileLengthPairEntry.getKey();
- if (!FSUtils.isBaseFile(new Path(appendedFile))) {
- return Stream.empty();
- }
- final String pathWithPartition = partitionName + "/" + appendedFile;
- final Path appendedFilePath = new
Path(recordsGenerationParams.getDataMetaClient().getBasePath(),
pathWithPartition);
- try (HoodieFileReader fileReader =
-
HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO).getFileReader(recordsGenerationParams.getDataMetaClient().getHadoopConf(),
appendedFilePath)) {
- final BloomFilter fileBloomFilter = fileReader.readBloomFilter();
- if (fileBloomFilter == null) {
- LOG.error("Failed to read bloom filter for " + appendedFilePath);
- return Stream.empty();
- }
- ByteBuffer bloomByteBuffer =
ByteBuffer.wrap(fileBloomFilter.serializeToString().getBytes());
- HoodieRecord record =
HoodieMetadataPayload.createBloomFilterMetadataRecord(
- partition, appendedFile, instantTime,
recordsGenerationParams.getBloomFilterType(), bloomByteBuffer, false);
- return Stream.of(record);
- } catch (IOException e) {
- LOG.error("Failed to get bloom filter for file: " +
appendedFilePath);
+ HoodieData<HoodieRecord> appendedFilesRecordsRDD =
partitionToAppendedFilesRDD.map(partitionToAppendedFilesPair -> {
+ String partitionName = partitionToAppendedFilesPair.getLeft();
+ String appendedFile = partitionToAppendedFilesPair.getRight();
+ String partition = getPartitionIdentifier(partitionName);
+ if (!FSUtils.isBaseFile(new Path(appendedFile))) {
+ return null;
+ }
+ final String pathWithPartition = partitionName + "/" + appendedFile;
+ final Path appendedFilePath = new
Path(recordsGenerationParams.getDataMetaClient().getBasePath(),
pathWithPartition);
+ try (HoodieFileReader fileReader =
+
HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO).getFileReader(recordsGenerationParams.getDataMetaClient().getHadoopConf(),
appendedFilePath)) {
+ final BloomFilter fileBloomFilter = fileReader.readBloomFilter();
+ if (fileBloomFilter == null) {
+ LOG.error("Failed to read bloom filter for " + appendedFilePath);
+ return null;
}
- return Stream.empty();
- }).iterator();
- });
+ ByteBuffer bloomByteBuffer =
ByteBuffer.wrap(fileBloomFilter.serializeToString().getBytes());
+ return (HoodieRecord)
(HoodieMetadataPayload.createBloomFilterMetadataRecord(
+ partition, appendedFile, instantTime,
recordsGenerationParams.getBloomFilterType(), bloomByteBuffer, false));
+ } catch (IOException e) {
+ LOG.error("Failed to get bloom filter for file: " + appendedFilePath);
+ }
+ return null;
+ }).filter(Objects::nonNull);
allRecordsRDD = allRecordsRDD.union(appendedFilesRecordsRDD);
return allRecordsRDD;
@@ -927,48 +925,42 @@ public class HoodieTableMetadataUtil {
return engineContext.emptyHoodieData();
}
- final List<Pair<String, List<String>>> partitionToDeletedFilesList =
partitionToDeletedFiles.entrySet().stream()
- .map(e -> Pair.of(e.getKey(), e.getValue()))
- .collect(Collectors.toList());
+ List<Pair<String, String>> partitionToDeletedFilesList =
partitionToDeletedFiles.entrySet().stream().flatMap(entry -> {
+ return entry.getValue().stream().map(file -> Pair.of(entry.getKey(),
file));
+ }).collect(Collectors.toList());
int deletedFilesTargetParallelism =
Math.max(Math.min(partitionToDeletedFilesList.size(),
recordsGenerationParams.getColumnStatsIndexParallelism()), 1);
- final HoodieData<Pair<String, List<String>>> partitionToDeletedFilesRDD =
+ final HoodieData<Pair<String, String>> partitionToDeletedFilesRDD =
engineContext.parallelize(partitionToDeletedFilesList,
deletedFilesTargetParallelism);
HoodieData<HoodieRecord> deletedFilesRecordsRDD =
partitionToDeletedFilesRDD.flatMap(partitionToDeletedFilesPair -> {
- final String partitionPath = partitionToDeletedFilesPair.getLeft();
- final String partitionId = getPartitionIdentifier(partitionPath);
- final List<String> deletedFileList =
partitionToDeletedFilesPair.getRight();
-
- return deletedFileList.stream().flatMap(deletedFile -> {
- final String filePathWithPartition = partitionPath + "/" + deletedFile;
- return getColumnStatsRecords(partitionId, filePathWithPartition,
dataTableMetaClient, columnsToIndex, true);
- }).iterator();
+ String partitionPath = partitionToDeletedFilesPair.getLeft();
+ String partitionId = getPartitionIdentifier(partitionPath);
+ String deletedFile = partitionToDeletedFilesPair.getRight();
+ String filePathWithPartition = partitionPath + "/" + deletedFile;
+ return getColumnStatsRecords(partitionId, filePathWithPartition,
dataTableMetaClient, columnsToIndex, true).iterator();
});
allRecordsRDD = allRecordsRDD.union(deletedFilesRecordsRDD);
- final List<Pair<String, Map<String, Long>>> partitionToAppendedFilesList =
partitionToAppendedFiles.entrySet().stream()
- .map(entry -> Pair.of(entry.getKey(), entry.getValue()))
- .collect(Collectors.toList());
+ List<Pair<String, String>> partitionToAppendedFilesList =
partitionToAppendedFiles.entrySet().stream().flatMap(entry -> {
+ return entry.getValue().keySet().stream().map(file ->
Pair.of(entry.getKey(), file));
+ }).collect(Collectors.toList());
int appendedFilesTargetParallelism =
Math.max(Math.min(partitionToAppendedFilesList.size(),
recordsGenerationParams.getColumnStatsIndexParallelism()), 1);
- final HoodieData<Pair<String, Map<String, Long>>>
partitionToAppendedFilesRDD =
+ final HoodieData<Pair<String, String>> partitionToAppendedFilesRDD =
engineContext.parallelize(partitionToAppendedFilesList,
appendedFilesTargetParallelism);
HoodieData<HoodieRecord> appendedFilesRecordsRDD =
partitionToAppendedFilesRDD.flatMap(partitionToAppendedFilesPair -> {
- final String partitionPath = partitionToAppendedFilesPair.getLeft();
- final String partitionId = getPartitionIdentifier(partitionPath);
- final Map<String, Long> appendedFileMap =
partitionToAppendedFilesPair.getRight();
-
- return
appendedFileMap.entrySet().stream().flatMap(appendedFileNameLengthEntry -> {
- if (!FSUtils.isBaseFile(new Path(appendedFileNameLengthEntry.getKey()))
- ||
!appendedFileNameLengthEntry.getKey().endsWith(HoodieFileFormat.PARQUET.getFileExtension()))
{
- return Stream.empty();
- }
- final String filePathWithPartition = partitionPath + "/" +
appendedFileNameLengthEntry.getKey();
- return getColumnStatsRecords(partitionId, filePathWithPartition,
dataTableMetaClient, columnsToIndex, false);
- }).iterator();
+ String partitionPath = partitionToAppendedFilesPair.getLeft();
+ String partitionId = getPartitionIdentifier(partitionPath);
+ String appendedFile = partitionToAppendedFilesPair.getRight();
+ if (!FSUtils.isBaseFile(new Path(appendedFile))
+ ||
!appendedFile.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
+ return Stream.<HoodieRecord>empty().iterator();
+ }
+ final String filePathWithPartition = partitionPath + "/" + appendedFile;
+ return getColumnStatsRecords(partitionId, filePathWithPartition,
dataTableMetaClient, columnsToIndex, false).iterator();
});
allRecordsRDD = allRecordsRDD.union(appendedFilesRecordsRDD);