This is an automated email from the ASF dual-hosted git repository. pwason pushed a commit to branch release-0.14.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 612d02b35a0d2236a0be0d6e94d09fe8d0962c5e Author: Prashant Wason <[email protected]> AuthorDate: Fri Aug 11 09:37:19 2023 -0700 [HUDI-6553] Speedup column stats and bloom index creation on large datasets. (#9223) * [HUDI-6553] Speedup column stats and bloom index creation on large datasets. * addressing feedback * Fix log message --------- Co-authored-by: sivabalan <[email protected]> Co-authored-by: Sagar Sumit <[email protected]> --- .../metadata/HoodieBackedTableMetadataWriter.java | 6 + hudi-common/pom.xml | 7 + .../hudi/metadata/HoodieTableMetadataUtil.java | 160 +++++++++------------ 3 files changed, 84 insertions(+), 89 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index 74d8ae16176..e99ec493558 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -431,6 +431,10 @@ public abstract class HoodieBackedTableMetadataWriter<I> implements HoodieTableM + " bootstrap failed for " + metadataMetaClient.getBasePath(), e); } + LOG.info(String.format("Initializing %s index with %d mappings and %d file groups.", partitionType.name(), fileGroupCountAndRecordsPair.getKey(), + fileGroupCountAndRecordsPair.getValue().count())); + HoodieTimer partitionInitTimer = HoodieTimer.start(); + // Generate the file groups final int fileGroupCount = fileGroupCountAndRecordsPair.getKey(); ValidationUtils.checkArgument(fileGroupCount > 0, "FileGroup count for MDT partition " + partitionType.name() + " should be > 0"); @@ -443,6 +447,8 @@ public abstract class HoodieBackedTableMetadataWriter<I> implements HoodieTableM dataMetaClient.getTableConfig().setMetadataPartitionState(dataMetaClient, partitionType, true); // initialize the metadata reader again so the MDT partition can be read after initialization initMetadataReader(); + long totalInitTime = partitionInitTimer.endTimer(); + LOG.info(String.format("Initializing %s index in metadata table took " + totalInitTime + " in ms", partitionType.name())); } return true; diff --git a/hudi-common/pom.xml b/hudi-common/pom.xml index 2b4eb2829b8..71f7cf85ab9 100644 --- a/hudi-common/pom.xml +++ b/hudi-common/pom.xml @@ -103,6 +103,13 @@ </build> <dependencies> + <!-- Scala --> + <dependency> + <groupId>org.scala-lang</groupId> + <artifactId>scala-library</artifactId> + <version>${scala.version}</version> + </dependency> + <dependency> <groupId>org.openjdk.jol</groupId> <artifactId>jol-core</artifactId> diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index b50ff114250..08fc663fbad 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -18,6 +18,7 @@ package org.apache.hudi.metadata; +import org.apache.hadoop.conf.Configuration; import org.apache.hudi.avro.ConvertingGenericData; import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieMetadataColumnStats; @@ -87,7 +88,6 @@ import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Set; import java.util.function.BiFunction; import java.util.function.Function; @@ -95,6 +95,8 @@ import java.util.stream.Collector; import java.util.stream.Collectors; import java.util.stream.Stream; +import scala.Tuple3; + import static org.apache.hudi.avro.AvroSchemaUtils.resolveNullableSchema; import static org.apache.hudi.avro.HoodieAvroUtils.addMetadataFields; import static org.apache.hudi.avro.HoodieAvroUtils.convertValueForSpecificDataTypes; @@ -787,61 +789,39 @@ public class HoodieTableMetadataUtil { Map<String, Map<String, Long>> partitionToAppendedFiles, MetadataRecordsGenerationParams recordsGenerationParams, String instantTime) { - HoodieData<HoodieRecord> allRecordsRDD = engineContext.emptyHoodieData(); - - List<Pair<String, String>> partitionToDeletedFilesList = partitionToDeletedFiles.entrySet().stream().flatMap(entry -> { - return entry.getValue().stream().map(file -> Pair.of(entry.getKey(), file)); - }).collect(Collectors.toList()); - - int parallelism = Math.max(Math.min(partitionToDeletedFilesList.size(), recordsGenerationParams.getBloomIndexParallelism()), 1); - HoodieData<Pair<String, String>> partitionToDeletedFilesRDD = engineContext.parallelize(partitionToDeletedFilesList, parallelism); - - HoodieData<HoodieRecord> deletedFilesRecordsRDD = partitionToDeletedFilesRDD.map(partitionToDeletedFilePair -> { - String partitionName = partitionToDeletedFilePair.getLeft(); - String deletedFile = partitionToDeletedFilePair.getRight(); - if (!FSUtils.isBaseFile(new Path(deletedFile))) { - return null; - } - final String partition = getPartitionIdentifier(partitionName); - return (HoodieRecord) (HoodieMetadataPayload.createBloomFilterMetadataRecord( - partition, deletedFile, instantTime, StringUtils.EMPTY_STRING, ByteBuffer.allocate(0), true)); - }).filter(Objects::nonNull); - allRecordsRDD = allRecordsRDD.union(deletedFilesRecordsRDD); - - List<Pair<String, String>> partitionToAppendedFilesList = partitionToAppendedFiles.entrySet().stream().flatMap(entry -> { - return entry.getValue().keySet().stream().map(file -> Pair.of(entry.getKey(), file)); - }).collect(Collectors.toList()); - - parallelism = Math.max(Math.min(partitionToAppendedFilesList.size(), recordsGenerationParams.getBloomIndexParallelism()), 1); - HoodieData<Pair<String, String>> partitionToAppendedFilesRDD = engineContext.parallelize(partitionToAppendedFilesList, parallelism); - - HoodieData<HoodieRecord> appendedFilesRecordsRDD = partitionToAppendedFilesRDD.map(partitionToAppendedFilesPair -> { - String partitionName = partitionToAppendedFilesPair.getLeft(); - String appendedFile = partitionToAppendedFilesPair.getRight(); - String partition = getPartitionIdentifier(partitionName); - if (!FSUtils.isBaseFile(new Path(appendedFile))) { - return null; + // Create the tuple (partition, filename, isDeleted) to handle both deletes and appends + final List<Tuple3<String, String, Boolean>> partitionFileFlagTupleList = fetchPartitionFileInfoTriplets(partitionToDeletedFiles, partitionToAppendedFiles); + + // Create records MDT + int parallelism = Math.max(Math.min(partitionFileFlagTupleList.size(), recordsGenerationParams.getBloomIndexParallelism()), 1); + return engineContext.parallelize(partitionFileFlagTupleList, parallelism).flatMap(partitionFileFlagTuple -> { + final String partitionName = partitionFileFlagTuple._1(); + final String filename = partitionFileFlagTuple._2(); + final boolean isDeleted = partitionFileFlagTuple._3(); + if (!FSUtils.isBaseFile(new Path(filename))) { + LOG.warn(String.format("Ignoring file %s as it is not a base file", filename)); + return Stream.<HoodieRecord>empty().iterator(); } - final String pathWithPartition = partitionName + "/" + appendedFile; - final Path appendedFilePath = new Path(recordsGenerationParams.getDataMetaClient().getBasePath(), pathWithPartition); - try (HoodieFileReader fileReader = - HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO).getFileReader(recordsGenerationParams.getDataMetaClient().getHadoopConf(), appendedFilePath)) { - final BloomFilter fileBloomFilter = fileReader.readBloomFilter(); - if (fileBloomFilter == null) { - LOG.error("Failed to read bloom filter for " + appendedFilePath); - return null; + + // Read the bloom filter from the base file if the file is being added + ByteBuffer bloomFilterBuffer = ByteBuffer.allocate(0); + if (!isDeleted) { + final String pathWithPartition = partitionName + "/" + filename; + final Path addedFilePath = new Path(recordsGenerationParams.getDataMetaClient().getBasePath(), pathWithPartition); + bloomFilterBuffer = readBloomFilter(recordsGenerationParams.getDataMetaClient().getHadoopConf(), addedFilePath); + + // If reading the bloom filter failed then do not add a record for this file + if (bloomFilterBuffer == null) { + LOG.error("Failed to read bloom filter from " + addedFilePath); + return Stream.<HoodieRecord>empty().iterator(); } - ByteBuffer bloomByteBuffer = ByteBuffer.wrap(fileBloomFilter.serializeToString().getBytes()); - return (HoodieRecord) (HoodieMetadataPayload.createBloomFilterMetadataRecord( - partition, appendedFile, instantTime, recordsGenerationParams.getBloomFilterType(), bloomByteBuffer, false)); - } catch (IOException e) { - LOG.error("Failed to get bloom filter for file: " + appendedFilePath); } - return null; - }).filter(Objects::nonNull); - allRecordsRDD = allRecordsRDD.union(appendedFilesRecordsRDD); - return allRecordsRDD; + final String partition = getPartitionIdentifier(partitionName); + return Stream.<HoodieRecord>of(HoodieMetadataPayload.createBloomFilterMetadataRecord( + partition, filename, instantTime, recordsGenerationParams.getBloomFilterType(), bloomFilterBuffer, partitionFileFlagTuple._3())) + .iterator(); + }); } /** @@ -851,59 +831,61 @@ public class HoodieTableMetadataUtil { Map<String, List<String>> partitionToDeletedFiles, Map<String, Map<String, Long>> partitionToAppendedFiles, MetadataRecordsGenerationParams recordsGenerationParams) { - HoodieData<HoodieRecord> allRecordsRDD = engineContext.emptyHoodieData(); + // Find the columns to index HoodieTableMetaClient dataTableMetaClient = recordsGenerationParams.getDataMetaClient(); - final List<String> columnsToIndex = getColumnsToIndex(recordsGenerationParams, Lazy.lazily(() -> tryResolveSchemaForTable(dataTableMetaClient))); - if (columnsToIndex.isEmpty()) { // In case there are no columns to index, bail return engineContext.emptyHoodieData(); } - List<Pair<String, String>> partitionToDeletedFilesList = partitionToDeletedFiles.entrySet().stream().flatMap(entry -> { - return entry.getValue().stream().map(file -> Pair.of(entry.getKey(), file)); - }).collect(Collectors.toList()); - - int deletedFilesTargetParallelism = Math.max(Math.min(partitionToDeletedFilesList.size(), recordsGenerationParams.getColumnStatsIndexParallelism()), 1); - final HoodieData<Pair<String, String>> partitionToDeletedFilesRDD = - engineContext.parallelize(partitionToDeletedFilesList, deletedFilesTargetParallelism); - - HoodieData<HoodieRecord> deletedFilesRecordsRDD = partitionToDeletedFilesRDD.flatMap(partitionToDeletedFilesPair -> { - String partitionPath = partitionToDeletedFilesPair.getLeft(); - String partitionId = getPartitionIdentifier(partitionPath); - String deletedFile = partitionToDeletedFilesPair.getRight(); - String filePathWithPartition = partitionPath + "/" + deletedFile; - return getColumnStatsRecords(partitionId, filePathWithPartition, dataTableMetaClient, columnsToIndex, true).iterator(); - }); - - allRecordsRDD = allRecordsRDD.union(deletedFilesRecordsRDD); + LOG.info(String.format("Indexing %d columns for column stats index", columnsToIndex.size())); - List<Pair<String, String>> partitionToAppendedFilesList = partitionToAppendedFiles.entrySet().stream().flatMap(entry -> { - return entry.getValue().keySet().stream().map(file -> Pair.of(entry.getKey(), file)); - }).collect(Collectors.toList()); + // Create the tuple (partition, filename, isDeleted) to handle both deletes and appends + final List<Tuple3<String, String, Boolean>> partitionFileFlagTupleList = fetchPartitionFileInfoTriplets(partitionToDeletedFiles, partitionToAppendedFiles); - int appendedFilesTargetParallelism = Math.max(Math.min(partitionToAppendedFilesList.size(), recordsGenerationParams.getColumnStatsIndexParallelism()), 1); - final HoodieData<Pair<String, String>> partitionToAppendedFilesRDD = - engineContext.parallelize(partitionToAppendedFilesList, appendedFilesTargetParallelism); - - HoodieData<HoodieRecord> appendedFilesRecordsRDD = partitionToAppendedFilesRDD.flatMap(partitionToAppendedFilesPair -> { - String partitionPath = partitionToAppendedFilesPair.getLeft(); - String partitionId = getPartitionIdentifier(partitionPath); - String appendedFile = partitionToAppendedFilesPair.getRight(); - if (!FSUtils.isBaseFile(new Path(appendedFile)) - || !appendedFile.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) { + // Create records MDT + int parallelism = Math.max(Math.min(partitionFileFlagTupleList.size(), recordsGenerationParams.getColumnStatsIndexParallelism()), 1); + return engineContext.parallelize(partitionFileFlagTupleList, parallelism).flatMap(partitionFileFlagTuple -> { + final String partitionName = partitionFileFlagTuple._1(); + final String filename = partitionFileFlagTuple._2(); + final boolean isDeleted = partitionFileFlagTuple._3(); + if (!FSUtils.isBaseFile(new Path(filename)) || !filename.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) { + LOG.warn(String.format("Ignoring file %s as it is not a PARQUET file", filename)); return Stream.<HoodieRecord>empty().iterator(); } - final String filePathWithPartition = partitionPath + "/" + appendedFile; - return getColumnStatsRecords(partitionId, filePathWithPartition, dataTableMetaClient, columnsToIndex, false).iterator(); + + final String filePathWithPartition = partitionName + "/" + filename; + final String partitionId = getPartitionIdentifier(partitionName); + return getColumnStatsRecords(partitionId, filePathWithPartition, dataTableMetaClient, columnsToIndex, isDeleted).iterator(); }); + } - allRecordsRDD = allRecordsRDD.union(appendedFilesRecordsRDD); + private static ByteBuffer readBloomFilter(Configuration conf, Path filePath) throws IOException { + try (HoodieFileReader fileReader = HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO).getFileReader(conf, filePath)) { + final BloomFilter fileBloomFilter = fileReader.readBloomFilter(); + if (fileBloomFilter == null) { + return null; + } + return ByteBuffer.wrap(fileBloomFilter.serializeToString().getBytes()); + } + } - return allRecordsRDD; + private static List<Tuple3<String, String, Boolean>> fetchPartitionFileInfoTriplets(Map<String, List<String>> partitionToDeletedFiles, + Map<String, Map<String, Long>> partitionToAppendedFiles) { + // Total number of files which are added or deleted + final int totalFiles = partitionToDeletedFiles.values().stream().mapToInt(List::size).sum() + + partitionToAppendedFiles.values().stream().mapToInt(Map::size).sum(); + final List<Tuple3<String, String, Boolean>> partitionFileFlagTupleList = new ArrayList<>(totalFiles); + partitionToDeletedFiles.entrySet().stream() + .flatMap(entry -> entry.getValue().stream().map(deletedFile -> new Tuple3<>(entry.getKey(), deletedFile, true))) + .collect(Collectors.toCollection(() -> partitionFileFlagTupleList)); + partitionToAppendedFiles.entrySet().stream() + .flatMap(entry -> entry.getValue().keySet().stream().map(addedFile -> new Tuple3<>(entry.getKey(), addedFile, false))) + .collect(Collectors.toCollection(() -> partitionFileFlagTupleList)); + return partitionFileFlagTupleList; } /**
