This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new fae20cd12a0 [HUDI-6950] Query should process listed partitions to
avoid driver oom due to large number files in table first partition (#9875)
fae20cd12a0 is described below
commit fae20cd12a0057c8dda7f302699f65a2fe335d0a
Author: xuzifu666 <[email protected]>
AuthorDate: Wed Oct 18 08:40:03 2023 +0800
[HUDI-6950] Query should process listed partitions to avoid driver oom due
to large number files in table first partition (#9875)
---
.../metadata/FileSystemBackedTableMetadata.java | 95 ++++++++++++----------
1 file changed, 54 insertions(+), 41 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
index f4cd7c29074..3737793e0c6 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
@@ -54,7 +54,6 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.Collectors;
-import java.util.stream.Stream;
/**
* Implementation of {@link HoodieTableMetadata} based file-system-backed
table metadata.
@@ -157,52 +156,66 @@ public class FileSystemBackedTableMetadata extends
AbstractHoodieTableMetadata {
// TODO: Get the parallelism from HoodieWriteConfig
int listingParallelism = Math.min(DEFAULT_LISTING_PARALLELISM,
pathsToList.size());
- // List all directories in parallel:
- // if current dictionary contains PartitionMetadata, add it to result
- // if current dictionary does not contain PartitionMetadata, add its
subdirectory to queue to be processed.
+ // List all directories in parallel
engineContext.setJobStatus(this.getClass().getSimpleName(), "Listing all
partitions with prefix " + relativePathPrefix);
- // result below holds a list of pair. first entry in the pair optionally
holds the deduced list of partitions.
- // and second entry holds optionally a directory path to be processed
further.
- List<Pair<Option<String>, Option<Path>>> result =
engineContext.flatMap(pathsToList, path -> {
+ List<FileStatus> dirToFileListing = engineContext.flatMap(pathsToList,
path -> {
FileSystem fileSystem = path.getFileSystem(hadoopConf.get());
- if (HoodiePartitionMetadata.hasPartitionMetadata(fileSystem, path)) {
- return
Stream.of(Pair.of(Option.of(FSUtils.getRelativePartitionPath(dataBasePath.get(),
path)), Option.empty()));
- }
- return Arrays.stream(fileSystem.listStatus(path))
- .filter(status -> status.isDirectory() &&
!status.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME))
- .map(status -> Pair.of(Option.empty(),
Option.of(status.getPath())));
+ return Arrays.stream(fileSystem.listStatus(path));
}, listingParallelism);
pathsToList.clear();
- partitionPaths.addAll(result.stream().filter(entry ->
entry.getKey().isPresent())
- .map(entry -> entry.getKey().get())
- .filter(relativePartitionPath -> fullBoundExpr instanceof
Predicates.TrueExpression
- || (Boolean) fullBoundExpr.eval(
- extractPartitionValues(partitionFields, relativePartitionPath,
urlEncodePartitioningEnabled)))
- .collect(Collectors.toList()));
-
- Expression partialBoundExpr;
- // If partitionPaths is nonEmpty, we're already at the last path level,
and all paths
- // are filtered already.
- if (needPushDownExpressions && partitionPaths.isEmpty()) {
- // Here we assume the path level matches the number of partition
columns, so we'll rebuild
- // new schema based on current path level.
- // e.g. partition columns are <region, date, hh>, if we're listing the
second level, then
- // currentSchema would be <region, date>
- // `PartialBindVisitor` will bind reference if it can be found from
`currentSchema`, otherwise
- // will change the expression to `alwaysTrue`. Can see
`PartialBindVisitor` for details.
- Types.RecordType currentSchema =
Types.RecordType.get(partitionFields.fields().subList(0,
++currentPartitionLevel));
- PartialBindVisitor partialBindVisitor = new
PartialBindVisitor(currentSchema, caseSensitive);
- partialBoundExpr = pushedExpr.accept(partialBindVisitor);
- } else {
- partialBoundExpr = Predicates.alwaysTrue();
- }
+ // if current dictionary contains PartitionMetadata, add it to result
+ // if current dictionary does not contain PartitionMetadata, add it to
queue to be processed.
+ int fileListingParallelism = Math.min(DEFAULT_LISTING_PARALLELISM,
dirToFileListing.size());
+ if (!dirToFileListing.isEmpty()) {
+ // result below holds a list of pair. first entry in the pair
optionally holds the deduced list of partitions.
+ // and second entry holds optionally a directory path to be processed
further.
+ engineContext.setJobStatus(this.getClass().getSimpleName(),
"Processing listed partitions");
+ List<Pair<Option<String>, Option<Path>>> result =
engineContext.map(dirToFileListing, fileStatus -> {
+ FileSystem fileSystem =
fileStatus.getPath().getFileSystem(hadoopConf.get());
+ if (fileStatus.isDirectory()) {
+ if (HoodiePartitionMetadata.hasPartitionMetadata(fileSystem,
fileStatus.getPath())) {
+ return
Pair.of(Option.of(FSUtils.getRelativePartitionPath(dataBasePath.get(),
fileStatus.getPath())), Option.empty());
+ } else if
(!fileStatus.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME))
{
+ return Pair.of(Option.empty(), Option.of(fileStatus.getPath()));
+ }
+ } else if
(fileStatus.getPath().getName().startsWith(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX))
{
+ String partitionName =
FSUtils.getRelativePartitionPath(dataBasePath.get(),
fileStatus.getPath().getParent());
+ return Pair.of(Option.of(partitionName), Option.empty());
+ }
+ return Pair.of(Option.empty(), Option.empty());
+ }, fileListingParallelism);
+
+ partitionPaths.addAll(result.stream().filter(entry ->
entry.getKey().isPresent())
+ .map(entry -> entry.getKey().get())
+ .filter(relativePartitionPath -> fullBoundExpr instanceof
Predicates.TrueExpression
+ || (Boolean) fullBoundExpr.eval(
+ extractPartitionValues(partitionFields, relativePartitionPath,
urlEncodePartitioningEnabled)))
+ .collect(Collectors.toList()));
+
+ Expression partialBoundExpr;
+ // If partitionPaths is nonEmpty, we're already at the last path
level, and all paths
+ // are filtered already.
+ if (needPushDownExpressions && partitionPaths.isEmpty()) {
+ // Here we assume the path level matches the number of partition
columns, so we'll rebuild
+ // new schema based on current path level.
+ // e.g. partition columns are <region, date, hh>, if we're listing
the second level, then
+ // currentSchema would be <region, date>
+ // `PartialBindVisitor` will bind reference if it can be found from
`currentSchema`, otherwise
+ // will change the expression to `alwaysTrue`. Can see
`PartialBindVisitor` for details.
+ Types.RecordType currentSchema =
Types.RecordType.get(partitionFields.fields().subList(0,
++currentPartitionLevel));
+ PartialBindVisitor partialBindVisitor = new
PartialBindVisitor(currentSchema, caseSensitive);
+ partialBoundExpr = pushedExpr.accept(partialBindVisitor);
+ } else {
+ partialBoundExpr = Predicates.alwaysTrue();
+ }
- pathsToList.addAll(result.stream().filter(entry ->
entry.getValue().isPresent()).map(entry -> entry.getValue().get())
- .filter(path -> partialBoundExpr instanceof Predicates.TrueExpression
- || (Boolean) partialBoundExpr.eval(
- extractPartitionValues(partitionFields,
FSUtils.getRelativePartitionPath(dataBasePath.get(), path),
urlEncodePartitioningEnabled)))
- .collect(Collectors.toList()));
+ pathsToList.addAll(result.stream().filter(entry ->
entry.getValue().isPresent()).map(entry -> entry.getValue().get())
+ .filter(path -> partialBoundExpr instanceof
Predicates.TrueExpression
+ || (Boolean) partialBoundExpr.eval(
+ extractPartitionValues(partitionFields,
FSUtils.getRelativePartitionPath(dataBasePath.get(), path),
urlEncodePartitioningEnabled)))
+ .collect(Collectors.toList()));
+ }
}
return partitionPaths;
}