This is an automated email from the ASF dual-hosted git repository. vinoth pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 2bd4a68731826698d120765c534cabe6c3069a95 Author: Prashant Wason <[email protected]> AuthorDate: Thu Dec 31 01:40:12 2020 -0800 [HUDI-1469] Faster initialization of metadata table using parallelized listing. (#2343) * [HUDI-1469] Faster initialization of metadata table using parallelized listing which finds partitions and files in a single scan. * MINOR fixes Co-authored-by: Vinoth Chandar <[email protected]> --- .../org/apache/hudi/config/HoodieWriteConfig.java | 12 +++ .../metadata/HoodieBackedTableMetadataWriter.java | 89 ++++++++++++++-------- 2 files changed, 68 insertions(+), 33 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 138f1be..2a26abd 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -82,6 +82,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { public static final String BULKINSERT_INPUT_DATA_SCHEMA_DDL = "hoodie.bulkinsert.schema.ddl"; public static final String UPSERT_PARALLELISM = "hoodie.upsert.shuffle.parallelism"; public static final String DELETE_PARALLELISM = "hoodie.delete.shuffle.parallelism"; + public static final String FILE_LISTING_PARALLELISM = "hoodie.file.listing.parallelism"; public static final String DEFAULT_ROLLBACK_PARALLELISM = "100"; public static final String ROLLBACK_PARALLELISM = "hoodie.rollback.parallelism"; public static final String WRITE_BUFFER_LIMIT_BYTES = "hoodie.write.buffer.limit.bytes"; @@ -256,6 +257,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { return Integer.parseInt(props.getProperty(ROLLBACK_PARALLELISM)); } + public int getFileListingParallelism() { + return Math.max(Integer.parseInt(props.getProperty(FILE_LISTING_PARALLELISM)), 1); + } + public boolean shouldRollbackUsingMarkers() { return Boolean.parseBoolean(props.getProperty(ROLLBACK_USING_MARKERS)); } @@ -1002,6 +1007,11 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { return this; } + public Builder withFileListingParallelism(int parallelism) { + props.setProperty(FILE_LISTING_PARALLELISM, String.valueOf(parallelism)); + return this; + } + public Builder withUserDefinedBulkInsertPartitionerClass(String className) { props.setProperty(BULKINSERT_USER_DEFINED_PARTITIONER_CLASS, className); return this; @@ -1188,6 +1198,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { DEFAULT_PARALLELISM); setDefaultOnCondition(props, !props.containsKey(UPSERT_PARALLELISM), UPSERT_PARALLELISM, DEFAULT_PARALLELISM); setDefaultOnCondition(props, !props.containsKey(DELETE_PARALLELISM), DELETE_PARALLELISM, DEFAULT_PARALLELISM); + setDefaultOnCondition(props, !props.containsKey(FILE_LISTING_PARALLELISM), FILE_LISTING_PARALLELISM, + DEFAULT_PARALLELISM); setDefaultOnCondition(props, !props.containsKey(ROLLBACK_PARALLELISM), ROLLBACK_PARALLELISM, DEFAULT_ROLLBACK_PARALLELISM); setDefaultOnCondition(props, !props.containsKey(KEYGENERATOR_CLASS_PROP), diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index 9282e3b..ed24980 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -274,44 +274,19 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta initTableMetadata(); // List all partitions in the basePath of the containing dataset - FileSystem fs = datasetMetaClient.getFs(); - FileSystemBackedTableMetadata fileSystemBackedTableMetadata = new FileSystemBackedTableMetadata(hadoopConf, datasetWriteConfig.getBasePath(), - datasetWriteConfig.shouldAssumeDatePartitioning()); - List<String> partitions = fileSystemBackedTableMetadata.getAllPartitionPaths(); - LOG.info("Initializing metadata table by using file listings in " + partitions.size() + " partitions"); - - // List all partitions in parallel and collect the files in them - int parallelism = Math.max(partitions.size(), 1); - List<Pair<String, FileStatus[]>> partitionFileList = engineContext.map(partitions, partition -> { - FileStatus[] statuses = fileSystemBackedTableMetadata.getAllFilesInPartition(new Path(datasetWriteConfig.getBasePath(), partition)); - return Pair.of(partition, statuses); - }, parallelism); + LOG.info("Initializing metadata table by using file listings in " + datasetWriteConfig.getBasePath()); + Map<String, List<FileStatus>> partitionToFileStatus = getPartitionsToFilesMapping(datasetMetaClient); // Create a HoodieCommitMetadata with writeStats for all discovered files int[] stats = {0}; HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata(); - partitionFileList.forEach(t -> { - final String partition = t.getKey(); - try { - if (!fs.exists(new Path(datasetWriteConfig.getBasePath(), partition + Path.SEPARATOR + HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE))) { - return; - } - } catch (IOException e) { - throw new HoodieMetadataException("Failed to check partition " + partition, e); - } - + partitionToFileStatus.forEach((partition, statuses) -> { // Filter the statuses to only include files which were created before or on createInstantTime - Arrays.stream(t.getValue()).filter(status -> { + statuses.stream().filter(status -> { String filename = status.getPath().getName(); - if (filename.equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE)) { - return false; - } - if (HoodieTimeline.compareTimestamps(FSUtils.getCommitTime(filename), HoodieTimeline.GREATER_THAN, - createInstantTime)) { - return false; - } - return true; + return !HoodieTimeline.compareTimestamps(FSUtils.getCommitTime(filename), HoodieTimeline.GREATER_THAN, + createInstantTime); }).forEach(status -> { HoodieWriteStat writeStat = new HoodieWriteStat(); writeStat.setPath(partition + Path.SEPARATOR + status.getPath().getName()); @@ -329,11 +304,57 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta } }); - LOG.info("Committing " + partitionFileList.size() + " partitions and " + stats[0] + " files to metadata"); + LOG.info("Committing " + partitionToFileStatus.size() + " partitions and " + stats[0] + " files to metadata"); update(commitMetadata, createInstantTime); } /** + * Function to find hoodie partitions and list files in them in parallel. + * + * @param datasetMetaClient + * @return Map of partition names to a list of FileStatus for all the files in the partition + */ + private Map<String, List<FileStatus>> getPartitionsToFilesMapping(HoodieTableMetaClient datasetMetaClient) { + List<Path> pathsToList = new LinkedList<>(); + pathsToList.add(new Path(datasetWriteConfig.getBasePath())); + + Map<String, List<FileStatus>> partitionToFileStatus = new HashMap<>(); + final int fileListingParallelism = metadataWriteConfig.getFileListingParallelism(); + SerializableConfiguration conf = new SerializableConfiguration(datasetMetaClient.getHadoopConf()); + + while (!pathsToList.isEmpty()) { + int listingParallelism = Math.min(fileListingParallelism, pathsToList.size()); + // List all directories in parallel + List<Pair<Path, FileStatus[]>> dirToFileListing = engineContext.map(pathsToList, path -> { + FileSystem fs = path.getFileSystem(conf.get()); + return Pair.of(path, fs.listStatus(path)); + }, listingParallelism); + pathsToList.clear(); + + // If the listing reveals a directory, add it to queue. If the listing reveals a hoodie partition, add it to + // the results. + dirToFileListing.forEach(p -> { + List<FileStatus> filesInDir = Arrays.stream(p.getRight()).parallel() + .filter(fs -> !fs.getPath().getName().equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE)) + .collect(Collectors.toList()); + + if (p.getRight().length > filesInDir.size()) { + // Is a partition. Add all data files to result. + partitionToFileStatus.put(p.getLeft().getName(), filesInDir); + } else { + // Add sub-dirs to the queue + pathsToList.addAll(Arrays.stream(p.getRight()) + .filter(fs -> fs.isDirectory() && !fs.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME)) + .map(fs -> fs.getPath()) + .collect(Collectors.toList())); + } + }); + } + + return partitionToFileStatus; + } + + /** * Sync the Metadata Table from the instants created on the dataset. * * @param datasetMetaClient {@code HoodieTableMetaClient} for the dataset @@ -413,7 +434,9 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta writeStats.forEach(hoodieWriteStat -> { String pathWithPartition = hoodieWriteStat.getPath(); if (pathWithPartition == null) { - throw new HoodieMetadataException("Unable to find path in write stat to update metadata table " + hoodieWriteStat); + // Empty partition + LOG.warn("Unable to find path in write stat to update metadata table " + hoodieWriteStat); + return; } int offset = partition.equals(NON_PARTITIONED_NAME) ? 0 : partition.length() + 1;
