nsivabalan commented on PR #5829:
URL: https://github.com/apache/hudi/pull/5829#issuecomment-1152765348

   I went back and forth based on feedback given. 
   
   Here are my findings. 
   For one of my test tables in S3, with EMR cluster: 
   1. With 0.11.0:
   147 secs. 
   2. With this patch as is (where engine context is not used for 2nd phase)
   5.7 secs. 
   3. Latest master + adding engineContext for 2nd phase:
   16 secs. 
   4. I also tried completely rewriting the dag. 
   5. 12 secs. 
   ```
   
         while (!pathsToList.isEmpty()) {
           // TODO: Get the parallelism from HoodieWriteConfig
           int listingParallelism = Math.min(DEFAULT_LISTING_PARALLELISM, 
pathsToList.size());
   
           // List all directories in parallel
           List<FileStatus> dirToFileListing = 
engineContext.flatMap(pathsToList, path -> {
             FileSystem fileSystem = path.getFileSystem(hadoopConf.get());
             return Arrays.stream(fileSystem.listStatus(path));
           }, listingParallelism);
           pathsToList.clear();
   
           // if current dictionary contains PartitionMetadata, add it to result
           // if current dictionary does not contain PartitionMetadata, add it 
to queue
           int fileListingParallelism = Math.min(DEFAULT_LISTING_PARALLELISM, 
dirToFileListing.size());
           List<Pair<Option<String>, Option<Path>>> result = 
engineContext.map(dirToFileListing, fileStatus -> {
             FileSystem fileSystem = 
fileStatus.getPath().getFileSystem(hadoopConf.get());
             if (fileStatus.isDirectory()) {
               if (HoodiePartitionMetadata.hasPartitionMetadata(fileSystem, 
fileStatus.getPath())) {
                 return Pair.of(Option.of(FSUtils.getRelativePartitionPath(new 
Path(datasetBasePath), fileStatus.getPath())), Option.empty());
               } else if 
(!fileStatus.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME)) 
{
                 return Pair.of(Option.empty(), 
Option.of(fileStatus.getPath()));
               }
             } else if 
(fileStatus.getPath().getName().startsWith(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX))
 {
               String partitionName = FSUtils.getRelativePartitionPath(new 
Path(datasetBasePath), fileStatus.getPath().getParent());
               return Pair.of(Option.of(partitionName), Option.empty());
             }
             return Pair.of(Option.empty(), Option.empty());
           }, fileListingParallelism);
   
           partitionPaths.addAll(result.stream().filter(entry -> 
entry.getKey().isPresent()).map(entry -> entry.getKey().get())
               .collect(Collectors.toList()));
   
           pathsToList.addAll(result.stream().filter(entry -> 
entry.getValue().isPresent()).map(entry -> entry.getValue().get())
               .collect(Collectors.toList()));
   }
   ```
   
   So, based on above findings, I will go w/ what we have in this patch in its 
current state. Will address Raymond's and Alexey's feedback alone and unblock 
0.11.1.
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to