This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 7fbf7a36690 [HUDI-6476][FOLLOW-UP] Path filter by FileStatus to avoid
additional fs request (#9366)
7fbf7a36690 is described below
commit 7fbf7a366900536053c4333dc7d6f4d0ad9b06b4
Author: Wechar Yu <[email protected]>
AuthorDate: Fri Aug 18 09:43:48 2023 +0800
[HUDI-6476][FOLLOW-UP] Path filter by FileStatus to avoid additional fs
request (#9366)
---
.../metadata/FileSystemBackedTableMetadata.java | 95 ++++++++++------------
1 file changed, 41 insertions(+), 54 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
index b4a4da01977..8ea9861734a 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
@@ -54,6 +54,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
/**
* Implementation of {@link HoodieTableMetadata} based file-system-backed
table metadata.
@@ -167,66 +168,52 @@ public class FileSystemBackedTableMetadata extends
AbstractHoodieTableMetadata {
// TODO: Get the parallelism from HoodieWriteConfig
int listingParallelism = Math.min(DEFAULT_LISTING_PARALLELISM,
pathsToList.size());
- // List all directories in parallel
+ // List all directories in parallel:
+ // if current dictionary contains PartitionMetadata, add it to result
+ // if current dictionary does not contain PartitionMetadata, add its
subdirectory to queue to be processed.
engineContext.setJobStatus(this.getClass().getSimpleName(), "Listing all
partitions with prefix " + relativePathPrefix);
- List<FileStatus> dirToFileListing = engineContext.flatMap(pathsToList,
path -> {
+ // result below holds a list of pair. first entry in the pair optionally
holds the deduced list of partitions.
+ // and second entry holds optionally a directory path to be processed
further.
+ List<Pair<Option<String>, Option<Path>>> result =
engineContext.flatMap(pathsToList, path -> {
FileSystem fileSystem = path.getFileSystem(hadoopConf.get());
- return Arrays.stream(fileSystem.listStatus(path));
+ if (HoodiePartitionMetadata.hasPartitionMetadata(fileSystem, path)) {
+ return
Stream.of(Pair.of(Option.of(FSUtils.getRelativePartitionPath(dataBasePath.get(),
path)), Option.empty()));
+ }
+ return Arrays.stream(fileSystem.listStatus(path))
+ .filter(status -> status.isDirectory() &&
!status.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME))
+ .map(status -> Pair.of(Option.empty(),
Option.of(status.getPath())));
}, listingParallelism);
pathsToList.clear();
- // if current dictionary contains PartitionMetadata, add it to result
- // if current dictionary does not contain PartitionMetadata, add it to
queue to be processed.
- int fileListingParallelism = Math.min(DEFAULT_LISTING_PARALLELISM,
dirToFileListing.size());
- if (!dirToFileListing.isEmpty()) {
- // result below holds a list of pair. first entry in the pair
optionally holds the deduced list of partitions.
- // and second entry holds optionally a directory path to be processed
further.
- engineContext.setJobStatus(this.getClass().getSimpleName(),
"Processing listed partitions");
- List<Pair<Option<String>, Option<Path>>> result =
engineContext.map(dirToFileListing, fileStatus -> {
- FileSystem fileSystem =
fileStatus.getPath().getFileSystem(hadoopConf.get());
- if (fileStatus.isDirectory()) {
- if (HoodiePartitionMetadata.hasPartitionMetadata(fileSystem,
fileStatus.getPath())) {
- return
Pair.of(Option.of(FSUtils.getRelativePartitionPath(dataBasePath.get(),
fileStatus.getPath())), Option.empty());
- } else if
(!fileStatus.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME))
{
- return Pair.of(Option.empty(), Option.of(fileStatus.getPath()));
- }
- } else if
(fileStatus.getPath().getName().startsWith(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX))
{
- String partitionName =
FSUtils.getRelativePartitionPath(dataBasePath.get(),
fileStatus.getPath().getParent());
- return Pair.of(Option.of(partitionName), Option.empty());
- }
- return Pair.of(Option.empty(), Option.empty());
- }, fileListingParallelism);
-
- partitionPaths.addAll(result.stream().filter(entry ->
entry.getKey().isPresent())
- .map(entry -> entry.getKey().get())
- .filter(relativePartitionPath -> fullBoundExpr instanceof
Predicates.TrueExpression
- || (Boolean) fullBoundExpr.eval(
- extractPartitionValues(partitionFields, relativePartitionPath,
urlEncodePartitioningEnabled)))
- .collect(Collectors.toList()));
-
- Expression partialBoundExpr;
- // If partitionPaths is nonEmpty, we're already at the last path
level, and all paths
- // are filtered already.
- if (needPushDownExpressions && partitionPaths.isEmpty()) {
- // Here we assume the path level matches the number of partition
columns, so we'll rebuild
- // new schema based on current path level.
- // e.g. partition columns are <region, date, hh>, if we're listing
the second level, then
- // currentSchema would be <region, date>
- // `PartialBindVisitor` will bind reference if it can be found from
`currentSchema`, otherwise
- // will change the expression to `alwaysTrue`. Can see
`PartialBindVisitor` for details.
- Types.RecordType currentSchema =
Types.RecordType.get(partitionFields.fields().subList(0,
++currentPartitionLevel));
- PartialBindVisitor partialBindVisitor = new
PartialBindVisitor(currentSchema, caseSensitive);
- partialBoundExpr = pushedExpr.accept(partialBindVisitor);
- } else {
- partialBoundExpr = Predicates.alwaysTrue();
- }
-
- pathsToList.addAll(result.stream().filter(entry ->
entry.getValue().isPresent()).map(entry -> entry.getValue().get())
- .filter(path -> partialBoundExpr instanceof
Predicates.TrueExpression
- || (Boolean) partialBoundExpr.eval(
- extractPartitionValues(partitionFields,
FSUtils.getRelativePartitionPath(dataBasePath.get(), path),
urlEncodePartitioningEnabled)))
- .collect(Collectors.toList()));
+ partitionPaths.addAll(result.stream().filter(entry ->
entry.getKey().isPresent())
+ .map(entry -> entry.getKey().get())
+ .filter(relativePartitionPath -> fullBoundExpr instanceof
Predicates.TrueExpression
+ || (Boolean) fullBoundExpr.eval(
+ extractPartitionValues(partitionFields, relativePartitionPath,
urlEncodePartitioningEnabled)))
+ .collect(Collectors.toList()));
+
+ Expression partialBoundExpr;
+ // If partitionPaths is nonEmpty, we're already at the last path level,
and all paths
+ // are filtered already.
+ if (needPushDownExpressions && partitionPaths.isEmpty()) {
+ // Here we assume the path level matches the number of partition
columns, so we'll rebuild
+ // new schema based on current path level.
+ // e.g. partition columns are <region, date, hh>, if we're listing the
second level, then
+ // currentSchema would be <region, date>
+ // `PartialBindVisitor` will bind reference if it can be found from
`currentSchema`, otherwise
+ // will change the expression to `alwaysTrue`. Can see
`PartialBindVisitor` for details.
+ Types.RecordType currentSchema =
Types.RecordType.get(partitionFields.fields().subList(0,
++currentPartitionLevel));
+ PartialBindVisitor partialBindVisitor = new
PartialBindVisitor(currentSchema, caseSensitive);
+ partialBoundExpr = pushedExpr.accept(partialBindVisitor);
+ } else {
+ partialBoundExpr = Predicates.alwaysTrue();
}
+
+ pathsToList.addAll(result.stream().filter(entry ->
entry.getValue().isPresent()).map(entry -> entry.getValue().get())
+ .filter(path -> partialBoundExpr instanceof Predicates.TrueExpression
+ || (Boolean) partialBoundExpr.eval(
+ extractPartitionValues(partitionFields,
FSUtils.getRelativePartitionPath(dataBasePath.get(), path),
urlEncodePartitioningEnabled)))
+ .collect(Collectors.toList()));
}
return partitionPaths;
}