[
https://issues.apache.org/jira/browse/HUDI-4242?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Raymond Xu updated HUDI-4242:
-----------------------------
Priority: Blocker (was: Critical)
> Follow up on getAllPartitionPaths perf enhancement
> --------------------------------------------------
>
> Key: HUDI-4242
> URL: https://issues.apache.org/jira/browse/HUDI-4242
> Project: Apache Hudi
> Issue Type: Improvement
> Components: performance, reader-core
> Reporter: sivabalan narayanan
> Priority: Blocker
> Fix For: 0.12.1
>
>
> GetAllPartitionPaths had some perf degradation from 0.9.0 to 0.10.0 and hence
> we had reverted the change for now. But the change as such was good. So, we
> want to follow up to see if we can fix/enhance the new code. Old code does
> not leverage the spark engine to parallelize across diff folders. So, there
> could be scope for improvement. but from the perf nos, its not straight
> forward. So creating a follow up ticket.
>
> excerpt from the findings.
> For one of my test tables in S3, with EMR cluster (10k partitions)
> # With 0.11.0:
> 147 secs.
> # With this patch as is (where engine context is not used for 2nd phase)
> 5.7 secs.
> # Latest master + adding engineContext for 2nd phase:
> 16 secs.
> # I also tried completely rewriting the dag.
> 12 secs.
> while (!pathsToList.isEmpty()) \{
> // TODO: Get the parallelism from HoodieWriteConfig
> int listingParallelism = Math.min(DEFAULT_LISTING_PARALLELISM,
> pathsToList.size());
> // List all directories in parallel
> List<FileStatus> dirToFileListing =
> engineContext.flatMap(pathsToList, path -> {
> FileSystem fileSystem = path.getFileSystem(hadoopConf.get());
> return Arrays.stream(fileSystem.listStatus(path));
> }, listingParallelism);
> pathsToList.clear();
> // if current dictionary contains PartitionMetadata, add it to result
> // if current dictionary does not contain PartitionMetadata, add it
> to queue
> int fileListingParallelism = Math.min(DEFAULT_LISTING_PARALLELISM,
> dirToFileListing.size());
> List<Pair<Option<String>, Option<Path>>> result =
> engineContext.map(dirToFileListing, fileStatus -> \{
> FileSystem fileSystem =
> fileStatus.getPath().getFileSystem(hadoopConf.get());
> if (fileStatus.isDirectory()) {
> if (HoodiePartitionMetadata.hasPartitionMetadata(fileSystem,
> fileStatus.getPath())) {
> return Pair.of(Option.of(FSUtils.getRelativePartitionPath(new
> Path(datasetBasePath), fileStatus.getPath())), Option.empty());
> } else if
> (!fileStatus.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME))
> \{
> return Pair.of(Option.empty(), Option.of(fileStatus.getPath()));
> }
> } else if
> (fileStatus.getPath().getName().startsWith(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX))
> \{
> String partitionName = FSUtils.getRelativePartitionPath(new
> Path(datasetBasePath), fileStatus.getPath().getParent());
> return Pair.of(Option.of(partitionName), Option.empty());
> }
> return Pair.of(Option.empty(), Option.empty());
> }, fileListingParallelism);
> partitionPaths.addAll(result.stream().filter(entry ->
> entry.getKey().isPresent()).map(entry -> entry.getKey().get())
> .collect(Collectors.toList()));
> pathsToList.addAll(result.stream().filter(entry ->
> entry.getValue().isPresent()).map(entry -> entry.getValue().get())
> .collect(Collectors.toList()));
> }
> So, based on above findings, I will go w/ what we have in this patch in its
> current state. Will address Raymond's and Alexey's feedback alone and unblock
> 0.11.1.
>
> Ref patch: https://github.com/apache/hudi/pull/5829
--
This message was sent by Atlassian Jira
(v8.20.10#820010)