yihua commented on a change in pull request #4564:
URL: https://github.com/apache/hudi/pull/4564#discussion_r782543705



##########
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieRepairTool.java
##########
@@ -276,39 +272,71 @@ static boolean copyFiles(
             }
           });
           return results.iterator();
-        })
-        .collect();
+        }, true)
+        .collectAsList();
     return allResults.stream().reduce((r1, r2) -> r1 && r2).orElse(false);
   }
 
   /**
    * Lists all Hoodie files from the table base path.
    *
-   * @param basePathStr Table base path.
-   * @param conf        {@link Configuration} instance.
-   * @return An array of {@link FileStatus} of all Hoodie files.
+   * @param context       {@link HoodieEngineContext} instance.
+   * @param basePathStr   Table base path.
+   * @param expectedLevel Expected level in the directory hierarchy to include 
the file status.
+   * @param parallelism   Parallelism for the file listing.
+   * @return A list of absolute file paths of all Hoodie files.
    * @throws IOException upon errors.
    */
-  static FileStatus[] listFilesFromBasePath(String basePathStr, Configuration 
conf) throws IOException {
+  static List<String> listFilesFromBasePath(
+      HoodieEngineContext context, String basePathStr, int expectedLevel, int 
parallelism) {
     final Set<String> validFileExtensions = 
Arrays.stream(HoodieFileFormat.values())
         
.map(HoodieFileFormat::getFileExtension).collect(Collectors.toCollection(HashSet::new));
     final String logFileExtension = 
HoodieFileFormat.HOODIE_LOG.getFileExtension();
-    FileSystem fs = FSUtils.getFs(basePathStr, conf);
+    FileSystem fs = FSUtils.getFs(basePathStr, context.getHadoopConf().get());
     Path basePath = new Path(basePathStr);
+    return FSUtils.getFileStatusAtLevel(
+            context, fs, basePath, expectedLevel, parallelism).stream()
+        .filter(fileStatus -> {
+          if (!fileStatus.isFile()) {
+            return false;
+          }
+          Path path = fileStatus.getPath();
+          String extension = FSUtils.getFileExtension(path.getName());
+          return validFileExtensions.contains(extension) || 
path.getName().contains(logFileExtension);
+        })
+        .map(fileStatus -> fileStatus.getPath().toString())
+        .collect(Collectors.toList());
+  }
 
-    try {
-      return Arrays.stream(fs.listStatus(basePath, path -> {
-        String extension = FSUtils.getFileExtension(path.getName());
-        return validFileExtensions.contains(extension) || 
path.getName().contains(logFileExtension);
-      })).filter(FileStatus::isFile).toArray(FileStatus[]::new);
-    } catch (IOException e) {
-      // return empty FileStatus if partition does not exist already
-      if (!fs.exists(basePath)) {
-        return new FileStatus[0];
-      } else {
-        throw e;
-      }
-    }
+  /**
+   * Deletes files from table base path.
+   *
+   * @param context           {@link HoodieEngineContext} instance.
+   * @param basePath          Base path of the table.
+   * @param relativeFilePaths A {@link List} of relative file paths for 
deleting.
+   */
+  static boolean deleteFiles(
+      HoodieEngineContext context, String basePath, List<String> 
relativeFilePaths) {
+    SerializableConfiguration conf = context.getHadoopConf();
+    return context.parallelize(relativeFilePaths)
+        .mapPartitions(iterator -> {

Review comment:
       Yeah, here I'm using `mapPartitions` to reduce the cost of getting 
FileSystem instance, so it gets initialized once per partition instead of per 
file.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to