This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new fae20cd12a0 [HUDI-6950] Query should process listed partitions to 
avoid driver oom due to large number files in table first partition (#9875)
fae20cd12a0 is described below

commit fae20cd12a0057c8dda7f302699f65a2fe335d0a
Author: xuzifu666 <[email protected]>
AuthorDate: Wed Oct 18 08:40:03 2023 +0800

    [HUDI-6950] Query should process listed partitions to avoid driver oom due 
to large number files in table first partition (#9875)
---
 .../metadata/FileSystemBackedTableMetadata.java    | 95 ++++++++++++----------
 1 file changed, 54 insertions(+), 41 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
index f4cd7c29074..3737793e0c6 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
@@ -54,7 +54,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.stream.Collectors;
-import java.util.stream.Stream;
 
 /**
  * Implementation of {@link HoodieTableMetadata} based file-system-backed 
table metadata.
@@ -157,52 +156,66 @@ public class FileSystemBackedTableMetadata extends 
AbstractHoodieTableMetadata {
       // TODO: Get the parallelism from HoodieWriteConfig
       int listingParallelism = Math.min(DEFAULT_LISTING_PARALLELISM, 
pathsToList.size());
 
-      // List all directories in parallel:
-      // if current dictionary contains PartitionMetadata, add it to result
-      // if current dictionary does not contain PartitionMetadata, add its 
subdirectory to queue to be processed.
+      // List all directories in parallel
       engineContext.setJobStatus(this.getClass().getSimpleName(), "Listing all 
partitions with prefix " + relativePathPrefix);
-      // result below holds a list of pair. first entry in the pair optionally 
holds the deduced list of partitions.
-      // and second entry holds optionally a directory path to be processed 
further.
-      List<Pair<Option<String>, Option<Path>>> result = 
engineContext.flatMap(pathsToList, path -> {
+      List<FileStatus> dirToFileListing = engineContext.flatMap(pathsToList, 
path -> {
         FileSystem fileSystem = path.getFileSystem(hadoopConf.get());
-        if (HoodiePartitionMetadata.hasPartitionMetadata(fileSystem, path)) {
-          return 
Stream.of(Pair.of(Option.of(FSUtils.getRelativePartitionPath(dataBasePath.get(),
 path)), Option.empty()));
-        }
-        return Arrays.stream(fileSystem.listStatus(path))
-            .filter(status -> status.isDirectory() && 
!status.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME))
-            .map(status -> Pair.of(Option.empty(), 
Option.of(status.getPath())));
+        return Arrays.stream(fileSystem.listStatus(path));
       }, listingParallelism);
       pathsToList.clear();
 
-      partitionPaths.addAll(result.stream().filter(entry -> 
entry.getKey().isPresent())
-          .map(entry -> entry.getKey().get())
-          .filter(relativePartitionPath -> fullBoundExpr instanceof 
Predicates.TrueExpression
-              || (Boolean) fullBoundExpr.eval(
-              extractPartitionValues(partitionFields, relativePartitionPath, 
urlEncodePartitioningEnabled)))
-          .collect(Collectors.toList()));
-
-      Expression partialBoundExpr;
-      // If partitionPaths is nonEmpty, we're already at the last path level, 
and all paths
-      // are filtered already.
-      if (needPushDownExpressions && partitionPaths.isEmpty()) {
-        // Here we assume the path level matches the number of partition 
columns, so we'll rebuild
-        // new schema based on current path level.
-        // e.g. partition columns are <region, date, hh>, if we're listing the 
second level, then
-        // currentSchema would be <region, date>
-        // `PartialBindVisitor` will bind reference if it can be found from 
`currentSchema`, otherwise
-        // will change the expression to `alwaysTrue`. Can see 
`PartialBindVisitor` for details.
-        Types.RecordType currentSchema = 
Types.RecordType.get(partitionFields.fields().subList(0, 
++currentPartitionLevel));
-        PartialBindVisitor partialBindVisitor = new 
PartialBindVisitor(currentSchema, caseSensitive);
-        partialBoundExpr = pushedExpr.accept(partialBindVisitor);
-      } else {
-        partialBoundExpr = Predicates.alwaysTrue();
-      }
+      // if current dictionary contains PartitionMetadata, add it to result
+      // if current dictionary does not contain PartitionMetadata, add it to 
queue to be processed.
+      int fileListingParallelism = Math.min(DEFAULT_LISTING_PARALLELISM, 
dirToFileListing.size());
+      if (!dirToFileListing.isEmpty()) {
+        // result below holds a list of pair. first entry in the pair 
optionally holds the deduced list of partitions.
+        // and second entry holds optionally a directory path to be processed 
further.
+        engineContext.setJobStatus(this.getClass().getSimpleName(), 
"Processing listed partitions");
+        List<Pair<Option<String>, Option<Path>>> result = 
engineContext.map(dirToFileListing, fileStatus -> {
+          FileSystem fileSystem = 
fileStatus.getPath().getFileSystem(hadoopConf.get());
+          if (fileStatus.isDirectory()) {
+            if (HoodiePartitionMetadata.hasPartitionMetadata(fileSystem, 
fileStatus.getPath())) {
+              return 
Pair.of(Option.of(FSUtils.getRelativePartitionPath(dataBasePath.get(), 
fileStatus.getPath())), Option.empty());
+            } else if 
(!fileStatus.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME)) 
{
+              return Pair.of(Option.empty(), Option.of(fileStatus.getPath()));
+            }
+          } else if 
(fileStatus.getPath().getName().startsWith(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX))
 {
+            String partitionName = 
FSUtils.getRelativePartitionPath(dataBasePath.get(), 
fileStatus.getPath().getParent());
+            return Pair.of(Option.of(partitionName), Option.empty());
+          }
+          return Pair.of(Option.empty(), Option.empty());
+        }, fileListingParallelism);
+
+        partitionPaths.addAll(result.stream().filter(entry -> 
entry.getKey().isPresent())
+            .map(entry -> entry.getKey().get())
+            .filter(relativePartitionPath -> fullBoundExpr instanceof 
Predicates.TrueExpression
+                || (Boolean) fullBoundExpr.eval(
+                extractPartitionValues(partitionFields, relativePartitionPath, 
urlEncodePartitioningEnabled)))
+            .collect(Collectors.toList()));
+
+        Expression partialBoundExpr;
+        // If partitionPaths is nonEmpty, we're already at the last path 
level, and all paths
+        // are filtered already.
+        if (needPushDownExpressions && partitionPaths.isEmpty()) {
+          // Here we assume the path level matches the number of partition 
columns, so we'll rebuild
+          // new schema based on current path level.
+          // e.g. partition columns are <region, date, hh>, if we're listing 
the second level, then
+          // currentSchema would be <region, date>
+          // `PartialBindVisitor` will bind reference if it can be found from 
`currentSchema`, otherwise
+          // will change the expression to `alwaysTrue`. Can see 
`PartialBindVisitor` for details.
+          Types.RecordType currentSchema = 
Types.RecordType.get(partitionFields.fields().subList(0, 
++currentPartitionLevel));
+          PartialBindVisitor partialBindVisitor = new 
PartialBindVisitor(currentSchema, caseSensitive);
+          partialBoundExpr = pushedExpr.accept(partialBindVisitor);
+        } else {
+          partialBoundExpr = Predicates.alwaysTrue();
+        }
 
-      pathsToList.addAll(result.stream().filter(entry -> 
entry.getValue().isPresent()).map(entry -> entry.getValue().get())
-          .filter(path -> partialBoundExpr instanceof Predicates.TrueExpression
-              || (Boolean) partialBoundExpr.eval(
-                  extractPartitionValues(partitionFields, 
FSUtils.getRelativePartitionPath(dataBasePath.get(), path), 
urlEncodePartitioningEnabled)))
-          .collect(Collectors.toList()));
+        pathsToList.addAll(result.stream().filter(entry -> 
entry.getValue().isPresent()).map(entry -> entry.getValue().get())
+            .filter(path -> partialBoundExpr instanceof 
Predicates.TrueExpression
+                || (Boolean) partialBoundExpr.eval(
+                    extractPartitionValues(partitionFields, 
FSUtils.getRelativePartitionPath(dataBasePath.get(), path), 
urlEncodePartitioningEnabled)))
+            .collect(Collectors.toList()));
+      }
     }
     return partitionPaths;
   }

Reply via email to