This is an automated email from the ASF dual-hosted git repository. pwason pushed a commit to branch release-0.14.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit d600e98de63a7a877fd460ee0caca93265fc3bc5 Author: Wechar Yu <[email protected]> AuthorDate: Fri Aug 18 09:43:48 2023 +0800 [HUDI-6476][FOLLOW-UP] Path filter by FileStatus to avoid additional fs request (#9366) --- .../metadata/FileSystemBackedTableMetadata.java | 95 ++++++++++------------ 1 file changed, 41 insertions(+), 54 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java index b4a4da01977..8ea9861734a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java @@ -54,6 +54,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; import java.util.stream.Collectors; +import java.util.stream.Stream; /** * Implementation of {@link HoodieTableMetadata} based file-system-backed table metadata. @@ -167,66 +168,52 @@ public class FileSystemBackedTableMetadata extends AbstractHoodieTableMetadata { // TODO: Get the parallelism from HoodieWriteConfig int listingParallelism = Math.min(DEFAULT_LISTING_PARALLELISM, pathsToList.size()); - // List all directories in parallel + // List all directories in parallel: + // if current dictionary contains PartitionMetadata, add it to result + // if current dictionary does not contain PartitionMetadata, add its subdirectory to queue to be processed. engineContext.setJobStatus(this.getClass().getSimpleName(), "Listing all partitions with prefix " + relativePathPrefix); - List<FileStatus> dirToFileListing = engineContext.flatMap(pathsToList, path -> { + // result below holds a list of pair. first entry in the pair optionally holds the deduced list of partitions. + // and second entry holds optionally a directory path to be processed further. + List<Pair<Option<String>, Option<Path>>> result = engineContext.flatMap(pathsToList, path -> { FileSystem fileSystem = path.getFileSystem(hadoopConf.get()); - return Arrays.stream(fileSystem.listStatus(path)); + if (HoodiePartitionMetadata.hasPartitionMetadata(fileSystem, path)) { + return Stream.of(Pair.of(Option.of(FSUtils.getRelativePartitionPath(dataBasePath.get(), path)), Option.empty())); + } + return Arrays.stream(fileSystem.listStatus(path)) + .filter(status -> status.isDirectory() && !status.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME)) + .map(status -> Pair.of(Option.empty(), Option.of(status.getPath()))); }, listingParallelism); pathsToList.clear(); - // if current dictionary contains PartitionMetadata, add it to result - // if current dictionary does not contain PartitionMetadata, add it to queue to be processed. - int fileListingParallelism = Math.min(DEFAULT_LISTING_PARALLELISM, dirToFileListing.size()); - if (!dirToFileListing.isEmpty()) { - // result below holds a list of pair. first entry in the pair optionally holds the deduced list of partitions. - // and second entry holds optionally a directory path to be processed further. - engineContext.setJobStatus(this.getClass().getSimpleName(), "Processing listed partitions"); - List<Pair<Option<String>, Option<Path>>> result = engineContext.map(dirToFileListing, fileStatus -> { - FileSystem fileSystem = fileStatus.getPath().getFileSystem(hadoopConf.get()); - if (fileStatus.isDirectory()) { - if (HoodiePartitionMetadata.hasPartitionMetadata(fileSystem, fileStatus.getPath())) { - return Pair.of(Option.of(FSUtils.getRelativePartitionPath(dataBasePath.get(), fileStatus.getPath())), Option.empty()); - } else if (!fileStatus.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME)) { - return Pair.of(Option.empty(), Option.of(fileStatus.getPath())); - } - } else if (fileStatus.getPath().getName().startsWith(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX)) { - String partitionName = FSUtils.getRelativePartitionPath(dataBasePath.get(), fileStatus.getPath().getParent()); - return Pair.of(Option.of(partitionName), Option.empty()); - } - return Pair.of(Option.empty(), Option.empty()); - }, fileListingParallelism); - - partitionPaths.addAll(result.stream().filter(entry -> entry.getKey().isPresent()) - .map(entry -> entry.getKey().get()) - .filter(relativePartitionPath -> fullBoundExpr instanceof Predicates.TrueExpression - || (Boolean) fullBoundExpr.eval( - extractPartitionValues(partitionFields, relativePartitionPath, urlEncodePartitioningEnabled))) - .collect(Collectors.toList())); - - Expression partialBoundExpr; - // If partitionPaths is nonEmpty, we're already at the last path level, and all paths - // are filtered already. - if (needPushDownExpressions && partitionPaths.isEmpty()) { - // Here we assume the path level matches the number of partition columns, so we'll rebuild - // new schema based on current path level. - // e.g. partition columns are <region, date, hh>, if we're listing the second level, then - // currentSchema would be <region, date> - // `PartialBindVisitor` will bind reference if it can be found from `currentSchema`, otherwise - // will change the expression to `alwaysTrue`. Can see `PartialBindVisitor` for details. - Types.RecordType currentSchema = Types.RecordType.get(partitionFields.fields().subList(0, ++currentPartitionLevel)); - PartialBindVisitor partialBindVisitor = new PartialBindVisitor(currentSchema, caseSensitive); - partialBoundExpr = pushedExpr.accept(partialBindVisitor); - } else { - partialBoundExpr = Predicates.alwaysTrue(); - } - - pathsToList.addAll(result.stream().filter(entry -> entry.getValue().isPresent()).map(entry -> entry.getValue().get()) - .filter(path -> partialBoundExpr instanceof Predicates.TrueExpression - || (Boolean) partialBoundExpr.eval( - extractPartitionValues(partitionFields, FSUtils.getRelativePartitionPath(dataBasePath.get(), path), urlEncodePartitioningEnabled))) - .collect(Collectors.toList())); + partitionPaths.addAll(result.stream().filter(entry -> entry.getKey().isPresent()) + .map(entry -> entry.getKey().get()) + .filter(relativePartitionPath -> fullBoundExpr instanceof Predicates.TrueExpression + || (Boolean) fullBoundExpr.eval( + extractPartitionValues(partitionFields, relativePartitionPath, urlEncodePartitioningEnabled))) + .collect(Collectors.toList())); + + Expression partialBoundExpr; + // If partitionPaths is nonEmpty, we're already at the last path level, and all paths + // are filtered already. + if (needPushDownExpressions && partitionPaths.isEmpty()) { + // Here we assume the path level matches the number of partition columns, so we'll rebuild + // new schema based on current path level. + // e.g. partition columns are <region, date, hh>, if we're listing the second level, then + // currentSchema would be <region, date> + // `PartialBindVisitor` will bind reference if it can be found from `currentSchema`, otherwise + // will change the expression to `alwaysTrue`. Can see `PartialBindVisitor` for details. + Types.RecordType currentSchema = Types.RecordType.get(partitionFields.fields().subList(0, ++currentPartitionLevel)); + PartialBindVisitor partialBindVisitor = new PartialBindVisitor(currentSchema, caseSensitive); + partialBoundExpr = pushedExpr.accept(partialBindVisitor); + } else { + partialBoundExpr = Predicates.alwaysTrue(); } + + pathsToList.addAll(result.stream().filter(entry -> entry.getValue().isPresent()).map(entry -> entry.getValue().get()) + .filter(path -> partialBoundExpr instanceof Predicates.TrueExpression + || (Boolean) partialBoundExpr.eval( + extractPartitionValues(partitionFields, FSUtils.getRelativePartitionPath(dataBasePath.get(), path), urlEncodePartitioningEnabled))) + .collect(Collectors.toList())); } return partitionPaths; }
