nsivabalan commented on a change in pull request #4564:
URL: https://github.com/apache/hudi/pull/4564#discussion_r782122923
##########
File path:
hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieRepairTool.java
##########
@@ -276,39 +272,71 @@ static boolean copyFiles(
}
});
return results.iterator();
- })
- .collect();
+ }, true)
+ .collectAsList();
return allResults.stream().reduce((r1, r2) -> r1 && r2).orElse(false);
}
/**
* Lists all Hoodie files from the table base path.
*
- * @param basePathStr Table base path.
- * @param conf {@link Configuration} instance.
- * @return An array of {@link FileStatus} of all Hoodie files.
+ * @param context {@link HoodieEngineContext} instance.
+ * @param basePathStr Table base path.
+ * @param expectedLevel Expected level in the directory hierarchy to include
the file status.
+ * @param parallelism Parallelism for the file listing.
+ * @return A list of absolute file paths of all Hoodie files.
* @throws IOException upon errors.
*/
- static FileStatus[] listFilesFromBasePath(String basePathStr, Configuration
conf) throws IOException {
+ static List<String> listFilesFromBasePath(
+ HoodieEngineContext context, String basePathStr, int expectedLevel, int
parallelism) {
final Set<String> validFileExtensions =
Arrays.stream(HoodieFileFormat.values())
.map(HoodieFileFormat::getFileExtension).collect(Collectors.toCollection(HashSet::new));
final String logFileExtension =
HoodieFileFormat.HOODIE_LOG.getFileExtension();
- FileSystem fs = FSUtils.getFs(basePathStr, conf);
+ FileSystem fs = FSUtils.getFs(basePathStr, context.getHadoopConf().get());
Path basePath = new Path(basePathStr);
+ return FSUtils.getFileStatusAtLevel(
+ context, fs, basePath, expectedLevel, parallelism).stream()
+ .filter(fileStatus -> {
+ if (!fileStatus.isFile()) {
+ return false;
+ }
+ Path path = fileStatus.getPath();
+ String extension = FSUtils.getFileExtension(path.getName());
+ return validFileExtensions.contains(extension) ||
path.getName().contains(logFileExtension);
Review comment:
We have FSUtils.isDataFile(Path) if it can assist you here.
##########
File path:
hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieRepairTool.java
##########
@@ -276,39 +272,71 @@ static boolean copyFiles(
}
});
return results.iterator();
- })
- .collect();
+ }, true)
+ .collectAsList();
return allResults.stream().reduce((r1, r2) -> r1 && r2).orElse(false);
}
/**
* Lists all Hoodie files from the table base path.
*
- * @param basePathStr Table base path.
- * @param conf {@link Configuration} instance.
- * @return An array of {@link FileStatus} of all Hoodie files.
+ * @param context {@link HoodieEngineContext} instance.
+ * @param basePathStr Table base path.
+ * @param expectedLevel Expected level in the directory hierarchy to include
the file status.
+ * @param parallelism Parallelism for the file listing.
+ * @return A list of absolute file paths of all Hoodie files.
* @throws IOException upon errors.
*/
- static FileStatus[] listFilesFromBasePath(String basePathStr, Configuration
conf) throws IOException {
+ static List<String> listFilesFromBasePath(
+ HoodieEngineContext context, String basePathStr, int expectedLevel, int
parallelism) {
final Set<String> validFileExtensions =
Arrays.stream(HoodieFileFormat.values())
.map(HoodieFileFormat::getFileExtension).collect(Collectors.toCollection(HashSet::new));
final String logFileExtension =
HoodieFileFormat.HOODIE_LOG.getFileExtension();
- FileSystem fs = FSUtils.getFs(basePathStr, conf);
+ FileSystem fs = FSUtils.getFs(basePathStr, context.getHadoopConf().get());
Path basePath = new Path(basePathStr);
+ return FSUtils.getFileStatusAtLevel(
+ context, fs, basePath, expectedLevel, parallelism).stream()
+ .filter(fileStatus -> {
+ if (!fileStatus.isFile()) {
+ return false;
+ }
+ Path path = fileStatus.getPath();
+ String extension = FSUtils.getFileExtension(path.getName());
+ return validFileExtensions.contains(extension) ||
path.getName().contains(logFileExtension);
+ })
+ .map(fileStatus -> fileStatus.getPath().toString())
+ .collect(Collectors.toList());
+ }
- try {
- return Arrays.stream(fs.listStatus(basePath, path -> {
- String extension = FSUtils.getFileExtension(path.getName());
- return validFileExtensions.contains(extension) ||
path.getName().contains(logFileExtension);
- })).filter(FileStatus::isFile).toArray(FileStatus[]::new);
- } catch (IOException e) {
- // return empty FileStatus if partition does not exist already
- if (!fs.exists(basePath)) {
- return new FileStatus[0];
- } else {
- throw e;
- }
- }
+ /**
+ * Deletes files from table base path.
+ *
+ * @param context {@link HoodieEngineContext} instance.
+ * @param basePath Base path of the table.
+ * @param relativeFilePaths A {@link List} of relative file paths for
deleting.
+ */
+ static boolean deleteFiles(
+ HoodieEngineContext context, String basePath, List<String>
relativeFilePaths) {
+ SerializableConfiguration conf = context.getHadoopConf();
+ return context.parallelize(relativeFilePaths)
+ .mapPartitions(iterator -> {
Review comment:
we can do just map instead of mapPartitions. any particular reason. I am
fine either ways, just curious.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]