This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 05ac0113165 [HUDI-6553] Speedup column stats and bloom index creation
on large datasets. (#9223)
05ac0113165 is described below
commit 05ac011316564f97de178b023e8e93ff768c37a4
Author: Prashant Wason <[email protected]>
AuthorDate: Fri Aug 11 09:37:19 2023 -0700
[HUDI-6553] Speedup column stats and bloom index creation on large
datasets. (#9223)
* [HUDI-6553] Speedup column stats and bloom index creation on large
datasets.
* addressing feedback
* Fix log message
---------
Co-authored-by: sivabalan <[email protected]>
Co-authored-by: Sagar Sumit <[email protected]>
---
.../metadata/HoodieBackedTableMetadataWriter.java | 6 +
hudi-common/pom.xml | 7 +
.../hudi/metadata/HoodieTableMetadataUtil.java | 160 +++++++++------------
3 files changed, 84 insertions(+), 89 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 74d8ae16176..e99ec493558 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -431,6 +431,10 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
+ " bootstrap failed for " + metadataMetaClient.getBasePath(), e);
}
+ LOG.info(String.format("Initializing %s index with %d mappings and %d
file groups.", partitionType.name(), fileGroupCountAndRecordsPair.getKey(),
+ fileGroupCountAndRecordsPair.getValue().count()));
+ HoodieTimer partitionInitTimer = HoodieTimer.start();
+
// Generate the file groups
final int fileGroupCount = fileGroupCountAndRecordsPair.getKey();
ValidationUtils.checkArgument(fileGroupCount > 0, "FileGroup count for
MDT partition " + partitionType.name() + " should be > 0");
@@ -443,6 +447,8 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
dataMetaClient.getTableConfig().setMetadataPartitionState(dataMetaClient,
partitionType, true);
// initialize the metadata reader again so the MDT partition can be read
after initialization
initMetadataReader();
+ long totalInitTime = partitionInitTimer.endTimer();
+ LOG.info(String.format("Initializing %s index in metadata table took " +
totalInitTime + " in ms", partitionType.name()));
}
return true;
diff --git a/hudi-common/pom.xml b/hudi-common/pom.xml
index 5ef860e81a9..6eccbf9537d 100644
--- a/hudi-common/pom.xml
+++ b/hudi-common/pom.xml
@@ -103,6 +103,13 @@
</build>
<dependencies>
+ <!-- Scala -->
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ <version>${scala.version}</version>
+ </dependency>
+
<dependency>
<groupId>org.openjdk.jol</groupId>
<artifactId>jol-core</artifactId>
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index b50ff114250..08fc663fbad 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -18,6 +18,7 @@
package org.apache.hudi.metadata;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.avro.ConvertingGenericData;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieMetadataColumnStats;
@@ -87,7 +88,6 @@ import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.Function;
@@ -95,6 +95,8 @@ import java.util.stream.Collector;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import scala.Tuple3;
+
import static org.apache.hudi.avro.AvroSchemaUtils.resolveNullableSchema;
import static org.apache.hudi.avro.HoodieAvroUtils.addMetadataFields;
import static
org.apache.hudi.avro.HoodieAvroUtils.convertValueForSpecificDataTypes;
@@ -787,61 +789,39 @@ public class HoodieTableMetadataUtil {
Map<String, Map<String, Long>> partitionToAppendedFiles,
MetadataRecordsGenerationParams recordsGenerationParams,
String instantTime) {
- HoodieData<HoodieRecord> allRecordsRDD = engineContext.emptyHoodieData();
-
- List<Pair<String, String>> partitionToDeletedFilesList =
partitionToDeletedFiles.entrySet().stream().flatMap(entry -> {
- return entry.getValue().stream().map(file -> Pair.of(entry.getKey(),
file));
- }).collect(Collectors.toList());
-
- int parallelism = Math.max(Math.min(partitionToDeletedFilesList.size(),
recordsGenerationParams.getBloomIndexParallelism()), 1);
- HoodieData<Pair<String, String>> partitionToDeletedFilesRDD =
engineContext.parallelize(partitionToDeletedFilesList, parallelism);
-
- HoodieData<HoodieRecord> deletedFilesRecordsRDD =
partitionToDeletedFilesRDD.map(partitionToDeletedFilePair -> {
- String partitionName = partitionToDeletedFilePair.getLeft();
- String deletedFile = partitionToDeletedFilePair.getRight();
- if (!FSUtils.isBaseFile(new Path(deletedFile))) {
- return null;
- }
- final String partition = getPartitionIdentifier(partitionName);
- return (HoodieRecord)
(HoodieMetadataPayload.createBloomFilterMetadataRecord(
- partition, deletedFile, instantTime, StringUtils.EMPTY_STRING,
ByteBuffer.allocate(0), true));
- }).filter(Objects::nonNull);
- allRecordsRDD = allRecordsRDD.union(deletedFilesRecordsRDD);
-
- List<Pair<String, String>> partitionToAppendedFilesList =
partitionToAppendedFiles.entrySet().stream().flatMap(entry -> {
- return entry.getValue().keySet().stream().map(file ->
Pair.of(entry.getKey(), file));
- }).collect(Collectors.toList());
-
- parallelism = Math.max(Math.min(partitionToAppendedFilesList.size(),
recordsGenerationParams.getBloomIndexParallelism()), 1);
- HoodieData<Pair<String, String>> partitionToAppendedFilesRDD =
engineContext.parallelize(partitionToAppendedFilesList, parallelism);
-
- HoodieData<HoodieRecord> appendedFilesRecordsRDD =
partitionToAppendedFilesRDD.map(partitionToAppendedFilesPair -> {
- String partitionName = partitionToAppendedFilesPair.getLeft();
- String appendedFile = partitionToAppendedFilesPair.getRight();
- String partition = getPartitionIdentifier(partitionName);
- if (!FSUtils.isBaseFile(new Path(appendedFile))) {
- return null;
+ // Create the tuple (partition, filename, isDeleted) to handle both
deletes and appends
+ final List<Tuple3<String, String, Boolean>> partitionFileFlagTupleList =
fetchPartitionFileInfoTriplets(partitionToDeletedFiles,
partitionToAppendedFiles);
+
+ // Create records MDT
+ int parallelism = Math.max(Math.min(partitionFileFlagTupleList.size(),
recordsGenerationParams.getBloomIndexParallelism()), 1);
+ return engineContext.parallelize(partitionFileFlagTupleList,
parallelism).flatMap(partitionFileFlagTuple -> {
+ final String partitionName = partitionFileFlagTuple._1();
+ final String filename = partitionFileFlagTuple._2();
+ final boolean isDeleted = partitionFileFlagTuple._3();
+ if (!FSUtils.isBaseFile(new Path(filename))) {
+ LOG.warn(String.format("Ignoring file %s as it is not a base file",
filename));
+ return Stream.<HoodieRecord>empty().iterator();
}
- final String pathWithPartition = partitionName + "/" + appendedFile;
- final Path appendedFilePath = new
Path(recordsGenerationParams.getDataMetaClient().getBasePath(),
pathWithPartition);
- try (HoodieFileReader fileReader =
-
HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO).getFileReader(recordsGenerationParams.getDataMetaClient().getHadoopConf(),
appendedFilePath)) {
- final BloomFilter fileBloomFilter = fileReader.readBloomFilter();
- if (fileBloomFilter == null) {
- LOG.error("Failed to read bloom filter for " + appendedFilePath);
- return null;
+
+ // Read the bloom filter from the base file if the file is being added
+ ByteBuffer bloomFilterBuffer = ByteBuffer.allocate(0);
+ if (!isDeleted) {
+ final String pathWithPartition = partitionName + "/" + filename;
+ final Path addedFilePath = new
Path(recordsGenerationParams.getDataMetaClient().getBasePath(),
pathWithPartition);
+ bloomFilterBuffer =
readBloomFilter(recordsGenerationParams.getDataMetaClient().getHadoopConf(),
addedFilePath);
+
+ // If reading the bloom filter failed then do not add a record for
this file
+ if (bloomFilterBuffer == null) {
+ LOG.error("Failed to read bloom filter from " + addedFilePath);
+ return Stream.<HoodieRecord>empty().iterator();
}
- ByteBuffer bloomByteBuffer =
ByteBuffer.wrap(fileBloomFilter.serializeToString().getBytes());
- return (HoodieRecord)
(HoodieMetadataPayload.createBloomFilterMetadataRecord(
- partition, appendedFile, instantTime,
recordsGenerationParams.getBloomFilterType(), bloomByteBuffer, false));
- } catch (IOException e) {
- LOG.error("Failed to get bloom filter for file: " + appendedFilePath);
}
- return null;
- }).filter(Objects::nonNull);
- allRecordsRDD = allRecordsRDD.union(appendedFilesRecordsRDD);
- return allRecordsRDD;
+ final String partition = getPartitionIdentifier(partitionName);
+ return
Stream.<HoodieRecord>of(HoodieMetadataPayload.createBloomFilterMetadataRecord(
+ partition, filename, instantTime,
recordsGenerationParams.getBloomFilterType(), bloomFilterBuffer,
partitionFileFlagTuple._3()))
+ .iterator();
+ });
}
/**
@@ -851,59 +831,61 @@ public class HoodieTableMetadataUtil {
Map<String, List<String>> partitionToDeletedFiles,
Map<String, Map<String, Long>> partitionToAppendedFiles,
MetadataRecordsGenerationParams recordsGenerationParams) {
- HoodieData<HoodieRecord> allRecordsRDD = engineContext.emptyHoodieData();
+ // Find the columns to index
HoodieTableMetaClient dataTableMetaClient =
recordsGenerationParams.getDataMetaClient();
-
final List<String> columnsToIndex =
getColumnsToIndex(recordsGenerationParams,
Lazy.lazily(() -> tryResolveSchemaForTable(dataTableMetaClient)));
-
if (columnsToIndex.isEmpty()) {
// In case there are no columns to index, bail
return engineContext.emptyHoodieData();
}
- List<Pair<String, String>> partitionToDeletedFilesList =
partitionToDeletedFiles.entrySet().stream().flatMap(entry -> {
- return entry.getValue().stream().map(file -> Pair.of(entry.getKey(),
file));
- }).collect(Collectors.toList());
-
- int deletedFilesTargetParallelism =
Math.max(Math.min(partitionToDeletedFilesList.size(),
recordsGenerationParams.getColumnStatsIndexParallelism()), 1);
- final HoodieData<Pair<String, String>> partitionToDeletedFilesRDD =
- engineContext.parallelize(partitionToDeletedFilesList,
deletedFilesTargetParallelism);
-
- HoodieData<HoodieRecord> deletedFilesRecordsRDD =
partitionToDeletedFilesRDD.flatMap(partitionToDeletedFilesPair -> {
- String partitionPath = partitionToDeletedFilesPair.getLeft();
- String partitionId = getPartitionIdentifier(partitionPath);
- String deletedFile = partitionToDeletedFilesPair.getRight();
- String filePathWithPartition = partitionPath + "/" + deletedFile;
- return getColumnStatsRecords(partitionId, filePathWithPartition,
dataTableMetaClient, columnsToIndex, true).iterator();
- });
-
- allRecordsRDD = allRecordsRDD.union(deletedFilesRecordsRDD);
+ LOG.info(String.format("Indexing %d columns for column stats index",
columnsToIndex.size()));
- List<Pair<String, String>> partitionToAppendedFilesList =
partitionToAppendedFiles.entrySet().stream().flatMap(entry -> {
- return entry.getValue().keySet().stream().map(file ->
Pair.of(entry.getKey(), file));
- }).collect(Collectors.toList());
+ // Create the tuple (partition, filename, isDeleted) to handle both
deletes and appends
+ final List<Tuple3<String, String, Boolean>> partitionFileFlagTupleList =
fetchPartitionFileInfoTriplets(partitionToDeletedFiles,
partitionToAppendedFiles);
- int appendedFilesTargetParallelism =
Math.max(Math.min(partitionToAppendedFilesList.size(),
recordsGenerationParams.getColumnStatsIndexParallelism()), 1);
- final HoodieData<Pair<String, String>> partitionToAppendedFilesRDD =
- engineContext.parallelize(partitionToAppendedFilesList,
appendedFilesTargetParallelism);
-
- HoodieData<HoodieRecord> appendedFilesRecordsRDD =
partitionToAppendedFilesRDD.flatMap(partitionToAppendedFilesPair -> {
- String partitionPath = partitionToAppendedFilesPair.getLeft();
- String partitionId = getPartitionIdentifier(partitionPath);
- String appendedFile = partitionToAppendedFilesPair.getRight();
- if (!FSUtils.isBaseFile(new Path(appendedFile))
- ||
!appendedFile.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
+ // Create records MDT
+ int parallelism = Math.max(Math.min(partitionFileFlagTupleList.size(),
recordsGenerationParams.getColumnStatsIndexParallelism()), 1);
+ return engineContext.parallelize(partitionFileFlagTupleList,
parallelism).flatMap(partitionFileFlagTuple -> {
+ final String partitionName = partitionFileFlagTuple._1();
+ final String filename = partitionFileFlagTuple._2();
+ final boolean isDeleted = partitionFileFlagTuple._3();
+ if (!FSUtils.isBaseFile(new Path(filename)) ||
!filename.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
+ LOG.warn(String.format("Ignoring file %s as it is not a PARQUET file",
filename));
return Stream.<HoodieRecord>empty().iterator();
}
- final String filePathWithPartition = partitionPath + "/" + appendedFile;
- return getColumnStatsRecords(partitionId, filePathWithPartition,
dataTableMetaClient, columnsToIndex, false).iterator();
+
+ final String filePathWithPartition = partitionName + "/" + filename;
+ final String partitionId = getPartitionIdentifier(partitionName);
+ return getColumnStatsRecords(partitionId, filePathWithPartition,
dataTableMetaClient, columnsToIndex, isDeleted).iterator();
});
+ }
- allRecordsRDD = allRecordsRDD.union(appendedFilesRecordsRDD);
+ private static ByteBuffer readBloomFilter(Configuration conf, Path filePath)
throws IOException {
+ try (HoodieFileReader fileReader =
HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO).getFileReader(conf,
filePath)) {
+ final BloomFilter fileBloomFilter = fileReader.readBloomFilter();
+ if (fileBloomFilter == null) {
+ return null;
+ }
+ return ByteBuffer.wrap(fileBloomFilter.serializeToString().getBytes());
+ }
+ }
- return allRecordsRDD;
+ private static List<Tuple3<String, String, Boolean>>
fetchPartitionFileInfoTriplets(Map<String, List<String>>
partitionToDeletedFiles,
+
Map<String, Map<String, Long>> partitionToAppendedFiles) {
+ // Total number of files which are added or deleted
+ final int totalFiles =
partitionToDeletedFiles.values().stream().mapToInt(List::size).sum()
+ + partitionToAppendedFiles.values().stream().mapToInt(Map::size).sum();
+ final List<Tuple3<String, String, Boolean>> partitionFileFlagTupleList =
new ArrayList<>(totalFiles);
+ partitionToDeletedFiles.entrySet().stream()
+ .flatMap(entry -> entry.getValue().stream().map(deletedFile -> new
Tuple3<>(entry.getKey(), deletedFile, true)))
+ .collect(Collectors.toCollection(() -> partitionFileFlagTupleList));
+ partitionToAppendedFiles.entrySet().stream()
+ .flatMap(entry -> entry.getValue().keySet().stream().map(addedFile ->
new Tuple3<>(entry.getKey(), addedFile, false)))
+ .collect(Collectors.toCollection(() -> partitionFileFlagTupleList));
+ return partitionFileFlagTupleList;
}
/**