nsivabalan commented on PR #5829:
URL: https://github.com/apache/hudi/pull/5829#issuecomment-1152765348
I went back and forth based on feedback given.
Here are my findings.
For one of my test tables in S3, with EMR cluster:
1. With 0.11.0:
147 secs.
2. With this patch as is (where engine context is not used for 2nd phase)
5.7 secs.
3. Latest master + adding engineContext for 2nd phase:
16 secs.
4. I also tried completely rewriting the dag.
5. 12 secs.
```
while (!pathsToList.isEmpty()) {
// TODO: Get the parallelism from HoodieWriteConfig
int listingParallelism = Math.min(DEFAULT_LISTING_PARALLELISM,
pathsToList.size());
// List all directories in parallel
List<FileStatus> dirToFileListing =
engineContext.flatMap(pathsToList, path -> {
FileSystem fileSystem = path.getFileSystem(hadoopConf.get());
return Arrays.stream(fileSystem.listStatus(path));
}, listingParallelism);
pathsToList.clear();
// if current dictionary contains PartitionMetadata, add it to result
// if current dictionary does not contain PartitionMetadata, add it
to queue
int fileListingParallelism = Math.min(DEFAULT_LISTING_PARALLELISM,
dirToFileListing.size());
List<Pair<Option<String>, Option<Path>>> result =
engineContext.map(dirToFileListing, fileStatus -> {
FileSystem fileSystem =
fileStatus.getPath().getFileSystem(hadoopConf.get());
if (fileStatus.isDirectory()) {
if (HoodiePartitionMetadata.hasPartitionMetadata(fileSystem,
fileStatus.getPath())) {
return Pair.of(Option.of(FSUtils.getRelativePartitionPath(new
Path(datasetBasePath), fileStatus.getPath())), Option.empty());
} else if
(!fileStatus.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME))
{
return Pair.of(Option.empty(),
Option.of(fileStatus.getPath()));
}
} else if
(fileStatus.getPath().getName().startsWith(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX))
{
String partitionName = FSUtils.getRelativePartitionPath(new
Path(datasetBasePath), fileStatus.getPath().getParent());
return Pair.of(Option.of(partitionName), Option.empty());
}
return Pair.of(Option.empty(), Option.empty());
}, fileListingParallelism);
partitionPaths.addAll(result.stream().filter(entry ->
entry.getKey().isPresent()).map(entry -> entry.getKey().get())
.collect(Collectors.toList()));
pathsToList.addAll(result.stream().filter(entry ->
entry.getValue().isPresent()).map(entry -> entry.getValue().get())
.collect(Collectors.toList()));
}
```
So, based on above findings, I will go w/ what we have in this patch in its
current state. Will address Raymond's and Alexey's feedback alone and unblock
0.11.1.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]