This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 7fbf7a36690 [HUDI-6476][FOLLOW-UP] Path filter by FileStatus to avoid 
additional fs request (#9366)
7fbf7a36690 is described below

commit 7fbf7a366900536053c4333dc7d6f4d0ad9b06b4
Author: Wechar Yu <[email protected]>
AuthorDate: Fri Aug 18 09:43:48 2023 +0800

    [HUDI-6476][FOLLOW-UP] Path filter by FileStatus to avoid additional fs 
request (#9366)
---
 .../metadata/FileSystemBackedTableMetadata.java    | 95 ++++++++++------------
 1 file changed, 41 insertions(+), 54 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
index b4a4da01977..8ea9861734a 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
@@ -54,6 +54,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 /**
  * Implementation of {@link HoodieTableMetadata} based file-system-backed 
table metadata.
@@ -167,66 +168,52 @@ public class FileSystemBackedTableMetadata extends 
AbstractHoodieTableMetadata {
       // TODO: Get the parallelism from HoodieWriteConfig
       int listingParallelism = Math.min(DEFAULT_LISTING_PARALLELISM, 
pathsToList.size());
 
-      // List all directories in parallel
+      // List all directories in parallel:
+      // if current dictionary contains PartitionMetadata, add it to result
+      // if current dictionary does not contain PartitionMetadata, add its 
subdirectory to queue to be processed.
       engineContext.setJobStatus(this.getClass().getSimpleName(), "Listing all 
partitions with prefix " + relativePathPrefix);
-      List<FileStatus> dirToFileListing = engineContext.flatMap(pathsToList, 
path -> {
+      // result below holds a list of pair. first entry in the pair optionally 
holds the deduced list of partitions.
+      // and second entry holds optionally a directory path to be processed 
further.
+      List<Pair<Option<String>, Option<Path>>> result = 
engineContext.flatMap(pathsToList, path -> {
         FileSystem fileSystem = path.getFileSystem(hadoopConf.get());
-        return Arrays.stream(fileSystem.listStatus(path));
+        if (HoodiePartitionMetadata.hasPartitionMetadata(fileSystem, path)) {
+          return 
Stream.of(Pair.of(Option.of(FSUtils.getRelativePartitionPath(dataBasePath.get(),
 path)), Option.empty()));
+        }
+        return Arrays.stream(fileSystem.listStatus(path))
+            .filter(status -> status.isDirectory() && 
!status.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME))
+            .map(status -> Pair.of(Option.empty(), 
Option.of(status.getPath())));
       }, listingParallelism);
       pathsToList.clear();
 
-      // if current dictionary contains PartitionMetadata, add it to result
-      // if current dictionary does not contain PartitionMetadata, add it to 
queue to be processed.
-      int fileListingParallelism = Math.min(DEFAULT_LISTING_PARALLELISM, 
dirToFileListing.size());
-      if (!dirToFileListing.isEmpty()) {
-        // result below holds a list of pair. first entry in the pair 
optionally holds the deduced list of partitions.
-        // and second entry holds optionally a directory path to be processed 
further.
-        engineContext.setJobStatus(this.getClass().getSimpleName(), 
"Processing listed partitions");
-        List<Pair<Option<String>, Option<Path>>> result = 
engineContext.map(dirToFileListing, fileStatus -> {
-          FileSystem fileSystem = 
fileStatus.getPath().getFileSystem(hadoopConf.get());
-          if (fileStatus.isDirectory()) {
-            if (HoodiePartitionMetadata.hasPartitionMetadata(fileSystem, 
fileStatus.getPath())) {
-              return 
Pair.of(Option.of(FSUtils.getRelativePartitionPath(dataBasePath.get(), 
fileStatus.getPath())), Option.empty());
-            } else if 
(!fileStatus.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME)) 
{
-              return Pair.of(Option.empty(), Option.of(fileStatus.getPath()));
-            }
-          } else if 
(fileStatus.getPath().getName().startsWith(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX))
 {
-            String partitionName = 
FSUtils.getRelativePartitionPath(dataBasePath.get(), 
fileStatus.getPath().getParent());
-            return Pair.of(Option.of(partitionName), Option.empty());
-          }
-          return Pair.of(Option.empty(), Option.empty());
-        }, fileListingParallelism);
-
-        partitionPaths.addAll(result.stream().filter(entry -> 
entry.getKey().isPresent())
-            .map(entry -> entry.getKey().get())
-            .filter(relativePartitionPath -> fullBoundExpr instanceof 
Predicates.TrueExpression
-                || (Boolean) fullBoundExpr.eval(
-                extractPartitionValues(partitionFields, relativePartitionPath, 
urlEncodePartitioningEnabled)))
-            .collect(Collectors.toList()));
-
-        Expression partialBoundExpr;
-        // If partitionPaths is nonEmpty, we're already at the last path 
level, and all paths
-        // are filtered already.
-        if (needPushDownExpressions && partitionPaths.isEmpty()) {
-          // Here we assume the path level matches the number of partition 
columns, so we'll rebuild
-          // new schema based on current path level.
-          // e.g. partition columns are <region, date, hh>, if we're listing 
the second level, then
-          // currentSchema would be <region, date>
-          // `PartialBindVisitor` will bind reference if it can be found from 
`currentSchema`, otherwise
-          // will change the expression to `alwaysTrue`. Can see 
`PartialBindVisitor` for details.
-          Types.RecordType currentSchema = 
Types.RecordType.get(partitionFields.fields().subList(0, 
++currentPartitionLevel));
-          PartialBindVisitor partialBindVisitor = new 
PartialBindVisitor(currentSchema, caseSensitive);
-          partialBoundExpr = pushedExpr.accept(partialBindVisitor);
-        } else {
-          partialBoundExpr = Predicates.alwaysTrue();
-        }
-
-        pathsToList.addAll(result.stream().filter(entry -> 
entry.getValue().isPresent()).map(entry -> entry.getValue().get())
-            .filter(path -> partialBoundExpr instanceof 
Predicates.TrueExpression
-                || (Boolean) partialBoundExpr.eval(
-                extractPartitionValues(partitionFields, 
FSUtils.getRelativePartitionPath(dataBasePath.get(), path), 
urlEncodePartitioningEnabled)))
-            .collect(Collectors.toList()));
+      partitionPaths.addAll(result.stream().filter(entry -> 
entry.getKey().isPresent())
+          .map(entry -> entry.getKey().get())
+          .filter(relativePartitionPath -> fullBoundExpr instanceof 
Predicates.TrueExpression
+              || (Boolean) fullBoundExpr.eval(
+              extractPartitionValues(partitionFields, relativePartitionPath, 
urlEncodePartitioningEnabled)))
+          .collect(Collectors.toList()));
+
+      Expression partialBoundExpr;
+      // If partitionPaths is nonEmpty, we're already at the last path level, 
and all paths
+      // are filtered already.
+      if (needPushDownExpressions && partitionPaths.isEmpty()) {
+        // Here we assume the path level matches the number of partition 
columns, so we'll rebuild
+        // new schema based on current path level.
+        // e.g. partition columns are <region, date, hh>, if we're listing the 
second level, then
+        // currentSchema would be <region, date>
+        // `PartialBindVisitor` will bind reference if it can be found from 
`currentSchema`, otherwise
+        // will change the expression to `alwaysTrue`. Can see 
`PartialBindVisitor` for details.
+        Types.RecordType currentSchema = 
Types.RecordType.get(partitionFields.fields().subList(0, 
++currentPartitionLevel));
+        PartialBindVisitor partialBindVisitor = new 
PartialBindVisitor(currentSchema, caseSensitive);
+        partialBoundExpr = pushedExpr.accept(partialBindVisitor);
+      } else {
+        partialBoundExpr = Predicates.alwaysTrue();
       }
+
+      pathsToList.addAll(result.stream().filter(entry -> 
entry.getValue().isPresent()).map(entry -> entry.getValue().get())
+          .filter(path -> partialBoundExpr instanceof Predicates.TrueExpression
+              || (Boolean) partialBoundExpr.eval(
+                  extractPartitionValues(partitionFields, 
FSUtils.getRelativePartitionPath(dataBasePath.get(), path), 
urlEncodePartitioningEnabled)))
+          .collect(Collectors.toList()));
     }
     return partitionPaths;
   }

Reply via email to