umehrot2 commented on a change in pull request #2343:
URL: https://github.com/apache/hudi/pull/2343#discussion_r545546646
##########
File path:
hudi-client/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -369,10 +343,56 @@ private void bootstrapFromFilesystem(JavaSparkContext
jsc, HoodieTableMetaClient
}
});
- LOG.info("Committing " + partitionFileList.size() + " partitions and " +
stats[0] + " files to metadata");
+ LOG.info("Committing " + partitionToFileStatus.size() + " partitions and "
+ stats[0] + " files to metadata");
update(commitMetadata, createInstantTime);
}
+ /**
+ * Function to find hoodie partitions and list files in them in parallel.
+ *
+ * @param jsc
+ * @param datasetMetaClient
+ * @return Map of partition names to a list of FileStatus for all the files
in the partition
+ */
+ private Map<String, List<FileStatus>>
parallelFileSystemListing(JavaSparkContext jsc,
Review comment:
- Perhaps name it `getPartitionsToFilesMapping()` ?
- Ideally, it would have been good to have it as a utility in `FsUtils` but
because of our current structure we can't move it, since `FsUtils` is in
`hudi-common` which we don't want to depend on `spark`.
##########
File path:
hudi-client/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -369,10 +343,56 @@ private void bootstrapFromFilesystem(JavaSparkContext
jsc, HoodieTableMetaClient
}
});
- LOG.info("Committing " + partitionFileList.size() + " partitions and " +
stats[0] + " files to metadata");
+ LOG.info("Committing " + partitionToFileStatus.size() + " partitions and "
+ stats[0] + " files to metadata");
update(commitMetadata, createInstantTime);
}
+ /**
+ * Function to find hoodie partitions and list files in them in parallel.
+ *
+ * @param jsc
+ * @param datasetMetaClient
+ * @return Map of partition names to a list of FileStatus for all the files
in the partition
+ */
+ private Map<String, List<FileStatus>>
parallelFileSystemListing(JavaSparkContext jsc,
+ HoodieTableMetaClient datasetMetaClient) {
+ List<Path> pathsToList = new LinkedList<>();
+ pathsToList.add(new Path(datasetWriteConfig.getBasePath()));
+ Map<String, List<FileStatus>> partitionToFileStatus = new HashMap<>();
+
+ while (!pathsToList.isEmpty()) {
+ // List all directories in parallel
+ List<Pair<Path, FileStatus[]>> dirToFileListing =
+ jsc.parallelize(pathsToList, Math.min(pathsToList.size(),
jsc.defaultParallelism()))
+ .map(path -> {
+ FileSystem fs = datasetMetaClient.getFs();
Review comment:
This adds unnecessary overhead of serialization/deserialization of
`HoodieTableMetaClient`. Instead, we can do:
```
path.getFileSystem(new Configuration());
```
to get the file system.
##########
File path:
hudi-client/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -369,10 +343,56 @@ private void bootstrapFromFilesystem(JavaSparkContext
jsc, HoodieTableMetaClient
}
});
- LOG.info("Committing " + partitionFileList.size() + " partitions and " +
stats[0] + " files to metadata");
+ LOG.info("Committing " + partitionToFileStatus.size() + " partitions and "
+ stats[0] + " files to metadata");
update(commitMetadata, createInstantTime);
}
+ /**
+ * Function to find hoodie partitions and list files in them in parallel.
+ *
+ * @param jsc
+ * @param datasetMetaClient
+ * @return Map of partition names to a list of FileStatus for all the files
in the partition
+ */
+ private Map<String, List<FileStatus>>
parallelFileSystemListing(JavaSparkContext jsc,
+ HoodieTableMetaClient datasetMetaClient) {
+ List<Path> pathsToList = new LinkedList<>();
+ pathsToList.add(new Path(datasetWriteConfig.getBasePath()));
+ Map<String, List<FileStatus>> partitionToFileStatus = new HashMap<>();
+
+ while (!pathsToList.isEmpty()) {
+ // List all directories in parallel
+ List<Pair<Path, FileStatus[]>> dirToFileListing =
+ jsc.parallelize(pathsToList, Math.min(pathsToList.size(),
jsc.defaultParallelism()))
Review comment:
I think we should introduce like a `LISTING_PARALLELISM` property with a
default of `1500` and use it here.
(https://github.com/apache/hudi/blob/rfc-15/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java#L70)
##########
File path:
hudi-client/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -369,10 +343,56 @@ private void bootstrapFromFilesystem(JavaSparkContext
jsc, HoodieTableMetaClient
}
});
- LOG.info("Committing " + partitionFileList.size() + " partitions and " +
stats[0] + " files to metadata");
+ LOG.info("Committing " + partitionToFileStatus.size() + " partitions and "
+ stats[0] + " files to metadata");
update(commitMetadata, createInstantTime);
}
+ /**
+ * Function to find hoodie partitions and list files in them in parallel.
+ *
+ * @param jsc
+ * @param datasetMetaClient
+ * @return Map of partition names to a list of FileStatus for all the files
in the partition
+ */
+ private Map<String, List<FileStatus>>
parallelFileSystemListing(JavaSparkContext jsc,
+ HoodieTableMetaClient datasetMetaClient) {
+ List<Path> pathsToList = new LinkedList<>();
+ pathsToList.add(new Path(datasetWriteConfig.getBasePath()));
+ Map<String, List<FileStatus>> partitionToFileStatus = new HashMap<>();
+
+ while (!pathsToList.isEmpty()) {
+ // List all directories in parallel
+ List<Pair<Path, FileStatus[]>> dirToFileListing =
+ jsc.parallelize(pathsToList, Math.min(pathsToList.size(),
jsc.defaultParallelism()))
+ .map(path -> {
+ FileSystem fs = datasetMetaClient.getFs();
+ return Pair.of(path, fs.listStatus(path));
+ }).collect();
+ pathsToList.clear();
+
+ // If the listing reveals a directory, add it to queue. If the listing
reveals a hoodie partition, add it to
+ // the results.
+ dirToFileListing.forEach(p -> {
+ List<FileStatus> filesInDir = Arrays.stream(p.getRight())
+ .filter(fs ->
!fs.getPath().getName().equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE))
+ .collect(Collectors.toList());
Review comment:
May be `Arrays.stream().parallel()` ? This can be a huge list to go
through for large datasets.
##########
File path:
hudi-client/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -51,6 +51,7 @@
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
Review comment:
Can you re-use this same thing to `getAllPartitionPaths()` here:
https://github.com/apache/hudi/blob/rfc-15/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java#L149
The default mechanism is really slow. I investigated
`FsUtils.getAllPartitionPaths()` is also used at multiple places across Hudi
code-base which can benefit from RFC-15. So, what I am thinking is by default
we can let `FsUtils.getAllPartitionPaths()` always default to the metadata
table, and internally if the table is not present, it will use this default
parallelized/optimized listing of partition paths.
So, it benefits both customers who use metadata table and who don't.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]