Apache9 commented on a change in pull request #513: HBASE-22867 The 
ForkJoinPool in CleanerChore will spawn thousands of threads in our cluster 
with thousands table
URL: https://github.com/apache/hbase/pull/513#discussion_r315965283
 
 

 ##########
 File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java
 ##########
 @@ -380,126 +384,94 @@ public boolean setEnabled(final boolean enabled) {
   }
 
   private interface Action<T> {
-    T act() throws IOException;
+    T act() throws Exception;
   }
 
   /**
-   * Attemps to clean up a directory, its subdirectories, and files. Return 
value is true if
-   * everything was deleted. false on partial / total failures.
+   * Attemps to clean up a directory, its subdirectories, and files.
    */
-  private final class CleanerTask extends RecursiveTask<Boolean> {
-
-    private static final long serialVersionUID = -5444212174088754172L;
-
-    private final Path dir;
-    private final boolean root;
-
-    CleanerTask(final FileStatus dir, final boolean root) {
-      this(dir.getPath(), root);
-    }
-
-    CleanerTask(final Path dir, final boolean root) {
-      this.dir = dir;
-      this.root = root;
-    }
-
-    @Override
-    protected Boolean compute() {
-      LOG.trace("Cleaning under {}", dir);
-      List<FileStatus> subDirs;
-      List<FileStatus> files;
-      try {
-        // if dir doesn't exist, we'll get null back for both of these
-        // which will fall through to succeeding.
-        subDirs = getFilteredStatus(FileStatus::isDirectory);
-        files = getFilteredStatus(FileStatus::isFile);
-      } catch (IOException ioe) {
-        LOG.warn("failed to get FileStatus for contents of '{}'", dir, ioe);
-        return false;
-      }
-
-      boolean allFilesDeleted = true;
-      if (!files.isEmpty()) {
-        allFilesDeleted = deleteAction(() -> checkAndDeleteFiles(files), 
"files");
-      }
-
-      boolean allSubdirsDeleted = true;
+  private void traverseAndDelete(Path dir, boolean root, 
CompletableFuture<Boolean> result) {
+    try {
+      // Step.1: List all files under the given directory.
+      List<FileStatus> allPaths = Arrays.asList(fs.listStatus(dir));
+      List<FileStatus> subDirs =
+          
allPaths.stream().filter(FileStatus::isDirectory).collect(Collectors.toList());
+      List<FileStatus> files =
+          
allPaths.stream().filter(FileStatus::isFile).collect(Collectors.toList());
+
+      // Step.2: Try to delete all the deletable files.
+      boolean allFilesDeleted =
+          files.isEmpty() || deleteAction(() -> checkAndDeleteFiles(files), 
"files", dir);
+
+      // Step.3: Start to traverse and delete the sub-directories.
+      List<CompletableFuture<Boolean>> futures = new ArrayList<>();
       if (!subDirs.isEmpty()) {
-        List<CleanerTask> tasks = 
Lists.newArrayListWithCapacity(subDirs.size());
         sortByConsumedSpace(subDirs);
-        for (FileStatus subdir : subDirs) {
-          CleanerTask task = new CleanerTask(subdir, false);
-          tasks.add(task);
-          task.fork();
-        }
-        allSubdirsDeleted = deleteAction(() -> getCleanResult(tasks), 
"subdirs");
+        // Submit the request of sub-directory deletion.
+        subDirs.forEach(subDir -> {
+          CompletableFuture<Boolean> subFuture = new CompletableFuture<>();
+          pool.runAsync(() -> traverseAndDelete(subDir.getPath(), false, 
subFuture));
+          futures.add(subFuture);
+        });
       }
 
-      boolean result = allFilesDeleted && allSubdirsDeleted && 
isEmptyDirDeletable(dir);
-      // if and only if files and subdirs under current dir are deleted 
successfully, and the empty
-      // directory can be deleted, and it is not the root dir then task will 
try to delete it.
-      if (result && !root) {
-        result &= deleteAction(() -> fs.delete(dir, false), "dir");
-      }
-      return result;
-    }
-
-    /**
-     * Get FileStatus with filter.
-     * @param function a filter function
-     * @return filtered FileStatus or empty list if dir doesn't exist
-     * @throws IOException if there's an error other than dir not existing
-     */
-    private List<FileStatus> getFilteredStatus(Predicate<FileStatus> function) 
throws IOException {
-      return Optional.ofNullable(FSUtils.listStatusWithStatusFilter(fs, dir,
-        status -> function.test(status))).orElseGet(Collections::emptyList);
-    }
-
-    /**
-     * Perform a delete on a specified type.
-     * @param deletion a delete
-     * @param type possible values are 'files', 'subdirs', 'dirs'
-     * @return true if it deleted successfully, false otherwise
-     */
-    private boolean deleteAction(Action<Boolean> deletion, String type) {
-      boolean deleted;
-      try {
-        LOG.trace("Start deleting {} under {}", type, dir);
-        deleted = deletion.act();
-      } catch (PathIsNotEmptyDirectoryException exception) {
-        // N.B. HDFS throws this exception when we try to delete a non-empty 
directory, but
-        // LocalFileSystem throws a bare IOException. So some test code will 
get the verbose
-        // message below.
-        LOG.debug("Couldn't delete '{}' yet because it isn't empty. Probably 
transient. " +
-            "exception details at TRACE.", dir);
-        LOG.trace("Couldn't delete '{}' yet because it isn't empty 
w/exception.", dir, exception);
-        deleted = false;
-      } catch (IOException ioe) {
-        LOG.info("Could not delete {} under {}. might be transient; we'll 
retry. if it keeps " +
-                  "happening, use following exception when asking on mailing 
list.",
-                  type, dir, ioe);
-        deleted = false;
-      }
-      LOG.trace("Finish deleting {} under {}, deleted=", type, dir, deleted);
-      return deleted;
+      // Step.4: Once all sub-files & sub-directories are deleted, then can 
try to delete the
+      // current directory asynchronously.
+      CompletableFuture.allOf(futures.toArray(new 
CompletableFuture[futures.size()]))
+          .whenComplete((voidObj, e) -> {
 
 Review comment:
   Better use FutureUtils.addListener here. Although we do not run error-prone 
checks now in pre commit, this will lead to a FutureReturnValueIgnored warning, 
and it is a problem. If the code in whenComplete throw an exception, the 
current implementation will hide it and you can only see the program hangs, 
using FutureUtils.addListener can print out the exception stacktrace at least.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to