umehrot2 commented on a change in pull request #2343:
URL: https://github.com/apache/hudi/pull/2343#discussion_r545546646



##########
File path: 
hudi-client/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -369,10 +343,56 @@ private void bootstrapFromFilesystem(JavaSparkContext 
jsc, HoodieTableMetaClient
       }
     });
 
-    LOG.info("Committing " + partitionFileList.size() + " partitions and " + 
stats[0] + " files to metadata");
+    LOG.info("Committing " + partitionToFileStatus.size() + " partitions and " 
+ stats[0] + " files to metadata");
     update(commitMetadata, createInstantTime);
   }
 
+  /**
+   * Function to find hoodie partitions and list files in them in parallel.
+   *
+   * @param jsc
+   * @param datasetMetaClient
+   * @return Map of partition names to a list of FileStatus for all the files 
in the partition
+   */
+  private Map<String, List<FileStatus>> 
parallelFileSystemListing(JavaSparkContext jsc,

Review comment:
       - Perhaps name it `getPartitionsToFilesMapping()` ?
   - Ideally, it would have been good to have it as a utility in `FsUtils` but 
because of our current structure we can't move it, since `FsUtils` is in 
`hudi-common` which we don't want to depend on `spark`.

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -369,10 +343,56 @@ private void bootstrapFromFilesystem(JavaSparkContext 
jsc, HoodieTableMetaClient
       }
     });
 
-    LOG.info("Committing " + partitionFileList.size() + " partitions and " + 
stats[0] + " files to metadata");
+    LOG.info("Committing " + partitionToFileStatus.size() + " partitions and " 
+ stats[0] + " files to metadata");
     update(commitMetadata, createInstantTime);
   }
 
+  /**
+   * Function to find hoodie partitions and list files in them in parallel.
+   *
+   * @param jsc
+   * @param datasetMetaClient
+   * @return Map of partition names to a list of FileStatus for all the files 
in the partition
+   */
+  private Map<String, List<FileStatus>> 
parallelFileSystemListing(JavaSparkContext jsc,
+      HoodieTableMetaClient datasetMetaClient) {
+    List<Path> pathsToList = new LinkedList<>();
+    pathsToList.add(new Path(datasetWriteConfig.getBasePath()));
+    Map<String, List<FileStatus>> partitionToFileStatus = new HashMap<>();
+
+    while (!pathsToList.isEmpty()) {
+      // List all directories in parallel
+      List<Pair<Path, FileStatus[]>> dirToFileListing =
+          jsc.parallelize(pathsToList, Math.min(pathsToList.size(), 
jsc.defaultParallelism()))
+            .map(path -> {
+              FileSystem fs = datasetMetaClient.getFs();

Review comment:
       This adds unnecessary overhead of serialization/deserialization of 
`HoodieTableMetaClient`. Instead, we can  do:
   ```
   path.getFileSystem(new Configuration());
   ```
   to get the file system.

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -369,10 +343,56 @@ private void bootstrapFromFilesystem(JavaSparkContext 
jsc, HoodieTableMetaClient
       }
     });
 
-    LOG.info("Committing " + partitionFileList.size() + " partitions and " + 
stats[0] + " files to metadata");
+    LOG.info("Committing " + partitionToFileStatus.size() + " partitions and " 
+ stats[0] + " files to metadata");
     update(commitMetadata, createInstantTime);
   }
 
+  /**
+   * Function to find hoodie partitions and list files in them in parallel.
+   *
+   * @param jsc
+   * @param datasetMetaClient
+   * @return Map of partition names to a list of FileStatus for all the files 
in the partition
+   */
+  private Map<String, List<FileStatus>> 
parallelFileSystemListing(JavaSparkContext jsc,
+      HoodieTableMetaClient datasetMetaClient) {
+    List<Path> pathsToList = new LinkedList<>();
+    pathsToList.add(new Path(datasetWriteConfig.getBasePath()));
+    Map<String, List<FileStatus>> partitionToFileStatus = new HashMap<>();
+
+    while (!pathsToList.isEmpty()) {
+      // List all directories in parallel
+      List<Pair<Path, FileStatus[]>> dirToFileListing =
+          jsc.parallelize(pathsToList, Math.min(pathsToList.size(), 
jsc.defaultParallelism()))

Review comment:
       I think we should introduce like a `LISTING_PARALLELISM` property with a 
default of `1500` and use it here. 
(https://github.com/apache/hudi/blob/rfc-15/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java#L70)

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -369,10 +343,56 @@ private void bootstrapFromFilesystem(JavaSparkContext 
jsc, HoodieTableMetaClient
       }
     });
 
-    LOG.info("Committing " + partitionFileList.size() + " partitions and " + 
stats[0] + " files to metadata");
+    LOG.info("Committing " + partitionToFileStatus.size() + " partitions and " 
+ stats[0] + " files to metadata");
     update(commitMetadata, createInstantTime);
   }
 
+  /**
+   * Function to find hoodie partitions and list files in them in parallel.
+   *
+   * @param jsc
+   * @param datasetMetaClient
+   * @return Map of partition names to a list of FileStatus for all the files 
in the partition
+   */
+  private Map<String, List<FileStatus>> 
parallelFileSystemListing(JavaSparkContext jsc,
+      HoodieTableMetaClient datasetMetaClient) {
+    List<Path> pathsToList = new LinkedList<>();
+    pathsToList.add(new Path(datasetWriteConfig.getBasePath()));
+    Map<String, List<FileStatus>> partitionToFileStatus = new HashMap<>();
+
+    while (!pathsToList.isEmpty()) {
+      // List all directories in parallel
+      List<Pair<Path, FileStatus[]>> dirToFileListing =
+          jsc.parallelize(pathsToList, Math.min(pathsToList.size(), 
jsc.defaultParallelism()))
+            .map(path -> {
+              FileSystem fs = datasetMetaClient.getFs();
+              return Pair.of(path, fs.listStatus(path));
+            }).collect();
+      pathsToList.clear();
+
+      // If the listing reveals a directory, add it to queue. If the listing 
reveals a hoodie partition, add it to
+      // the results.
+      dirToFileListing.forEach(p -> {
+        List<FileStatus> filesInDir = Arrays.stream(p.getRight())
+            .filter(fs -> 
!fs.getPath().getName().equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE))
+            .collect(Collectors.toList());

Review comment:
       May be `Arrays.stream().parallel()` ? This can be a huge list to go 
through for large datasets.

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -51,6 +51,7 @@
 import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;

Review comment:
       Can you re-use this same thing to `getAllPartitionPaths()` here: 
https://github.com/apache/hudi/blob/rfc-15/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java#L149
   
   The default mechanism is really slow. I investigated 
`FsUtils.getAllPartitionPaths()` is also used at multiple places across Hudi 
code-base which can benefit from RFC-15. So, what I am thinking is by default 
we can let `FsUtils.getAllPartitionPaths()` always default to the metadata 
table, and internally if the table is not present, it will use this default 
parallelized/optimized listing of partition paths.
   
   So, it benefits both customers who use metadata table and who don't.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to