Copilot commented on code in PR #9686:
URL: https://github.com/apache/ozone/pull/9686#discussion_r2739098363


##########
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java:
##########
@@ -138,84 +142,183 @@ protected void execTaskCompletion() { }
    * Run one or more background tasks concurrently.
    * Wait until all tasks to return the result.
    */
-  public class PeriodicalTask implements Runnable {
-    @Override
-    public void run() {
-      // wait for previous set of tasks to complete
-      try {
-        future.join();
-      } catch (RuntimeException e) {
-        LOG.error("Background service execution failed.", e);
-      } finally {
-        execTaskCompletion();
-      }
+  public class PeriodicalTask extends RecursiveAction {
+    private final Queue<BackgroundTaskForkJoin> tasksInFlight;
+    private final AtomicReference<Boolean> isShutdown;
+    private final ScheduledExecutorService scheduledExecuterService;
+
+    public PeriodicalTask(ScheduledExecutorService scheduledExecutorService) {
+      this.tasksInFlight = new LinkedList<>();
+      this.isShutdown = BackgroundService.this.isShutdown;
+      this.scheduledExecuterService = scheduledExecutorService;
+    }
+
+    private PeriodicalTask(PeriodicalTask other) {
+      this.tasksInFlight = other.tasksInFlight;
+      this.isShutdown = other.isShutdown;
+      this.scheduledExecuterService = other.scheduledExecuterService;
+    }
+
+    private boolean performIfNotShutdown(Runnable runnable) {
+      return isShutdown.updateAndGet((shutdown) -> {
+        if (!shutdown) {
+          runnable.run();
+        }
+        return shutdown;
+      });
+    }
 
+    private <T> boolean performIfNotShutdown(Consumer<T> consumer, T t) {
+      return isShutdown.updateAndGet((shutdown) -> {
+        if (!shutdown) {
+          consumer.accept(t);
+        }
+        return shutdown;
+      });
+    }
+
+    private boolean runTasks() {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Running background service : {}", serviceName);
       }
-      BackgroundTaskQueue tasks = getTasks();
+      if (isShutdown.get()) {
+        return false;
+      }
+      if (!tasksInFlight.isEmpty()) {
+        LOG.warn("Tasks are still in flight service {}. This should not happen 
schedule should only begin once all " +
+            "tasks from schedules have completed execution.", serviceName);
+        tasksInFlight.clear();
+      }
+
+      BackgroundTaskQueue tasks = getTasks(true);
       if (tasks.isEmpty()) {
         // No task found, or some problems to init tasks
         // return and retry in next interval.
-        return;
+        return false;
       }
       if (LOG.isDebugEnabled()) {
         LOG.debug("Number of background tasks to execute : {}", tasks.size());
       }
-      synchronized (BackgroundService.this) {
-        while (!tasks.isEmpty()) {
-          BackgroundTask task = tasks.poll();
-          future = future.thenCombine(CompletableFuture.runAsync(() -> {
-            long startTime = System.nanoTime();
-            try {
-              BackgroundTaskResult result = task.call();
-              if (LOG.isDebugEnabled()) {
-                LOG.debug("task execution result size {}", result.getSize());
-              }
-            } catch (Throwable e) {
-              LOG.error("Background task execution failed", e);
-              if (e instanceof Error) {
-                throw (Error) e;
-              }
-            } finally {
-              long endTime = System.nanoTime();
-              if (endTime - startTime > serviceTimeoutInNanos) {
-                LOG.warn("{} Background task execution took {}ns > 
{}ns(timeout)",
-                    serviceName, endTime - startTime, serviceTimeoutInNanos);
-              }
-            }
-          }, exec).exceptionally(e -> null), (Void1, Void) -> null);
+      Consumer<BackgroundTaskForkJoin> taskForkHandler = task -> {
+        task.fork();
+        tasksInFlight.offer(task);
+      };
+      while (!tasks.isEmpty()) {
+        BackgroundTask task = tasks.poll();
+        // Wrap the task in a ForkJoin wrapper and fork it.
+        BackgroundTaskForkJoin forkJoinTask = new BackgroundTaskForkJoin(task);
+        if (performIfNotShutdown(taskForkHandler, forkJoinTask)) {
+          return false;
+        }
+      }
+      Consumer<BackgroundTaskForkJoin> taskCompletionHandler = task -> {
+        BackgroundTaskForkJoin.BackgroundTaskForkResult result = task.join();
+        // Check for exception first in the task execution.
+        if (result.getThrowable() != null) {
+          LOG.error("Background task execution failed", result.getThrowable());
+        } else {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("task execution result size {}", 
result.getResult().getSize());
+          }
+        }
+        if (result.getTotalExecutionTime() > serviceTimeoutInNanos) {
+          LOG.warn("{} Background task execution took {}ns > {}ns(timeout)",
+              serviceName, result.getTotalExecutionTime(), 
serviceTimeoutInNanos);
+        }
+      };
+      while (!tasksInFlight.isEmpty()) {
+        BackgroundTaskForkJoin taskInFlight = tasksInFlight.poll();
+        // Join the tasks forked before and wait for the result one by one.
+        if (performIfNotShutdown(taskCompletionHandler, taskInFlight)) {
+          return false;
         }
       }
+      return true;
+    }
+
+    private void scheduleNextTask() {
+      performIfNotShutdown(() -> {
+        if (scheduledExecuterService != null) {
+          scheduledExecuterService.schedule(() -> exec.submit(new 
PeriodicalTask(this)),
+              intervalInMillis, TimeUnit.MILLISECONDS);

Review Comment:
   The scheduled runnable in `scheduleNextTask()` calls `exec.submit(...)` 
without re-checking shutdown state or whether `exec` is still non-null. Since 
`shutdown()` sets `exec = null` and the shared scheduler uses `shutdown()` (not 
`shutdownNow()`), already-scheduled tasks can still execute after shutdown and 
hit NPE / `RejectedExecutionException`. Guard inside the scheduled runnable 
(check shutdown flag + `exec != null`) and/or cancel scheduled tasks on 
shutdown.
   ```suggestion
             scheduledExecuterService.schedule(() -> {
               if (!isShutdown.get() && exec != null) {
                 exec.submit(new PeriodicalTask(this));
               }
             }, intervalInMillis, TimeUnit.MILLISECONDS);
   ```



##########
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundTaskForkJoin.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.utils;
+
+import java.util.concurrent.RecursiveTask;
+
+/**
+ * A ForkJoin wrapper for {@link BackgroundTask} that enables parallel 
execution
+ * in a ForkJoinPool while keeping the BackgroundTask interface simple.
+ * 
+ * <p>This wrapper handles the RecursiveTask mechanics, timing, and exception
+ * handling, allowing BackgroundTask implementations to focus on their 
business logic.
+ */
+public class BackgroundTaskForkJoin extends 
RecursiveTask<BackgroundTaskForkJoin.BackgroundTaskForkResult> {
+  private static final long serialVersionUID = 1L;
+  private final transient BackgroundTask backgroundTask;
+
+  public BackgroundTaskForkJoin(BackgroundTask backgroundTask) {

Review Comment:
   This class is added under the `hdds-common` module but depends on 
`BackgroundTask`/`BackgroundTaskResult`, which are defined in 
`hdds-server-framework` (and `hdds-common` does not depend on it). Since 
`hdds-server-framework` already depends on `hdds-common`, adding the reverse 
dependency would create a cycle; as-is, this likely won’t compile. Consider 
moving this class into `hdds-server-framework`, or into a new shared module 
that both can depend on.



##########
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java:
##########
@@ -138,84 +142,183 @@ protected void execTaskCompletion() { }
    * Run one or more background tasks concurrently.
    * Wait until all tasks to return the result.
    */
-  public class PeriodicalTask implements Runnable {
-    @Override
-    public void run() {
-      // wait for previous set of tasks to complete
-      try {
-        future.join();
-      } catch (RuntimeException e) {
-        LOG.error("Background service execution failed.", e);
-      } finally {
-        execTaskCompletion();
-      }
+  public class PeriodicalTask extends RecursiveAction {
+    private final Queue<BackgroundTaskForkJoin> tasksInFlight;
+    private final AtomicReference<Boolean> isShutdown;
+    private final ScheduledExecutorService scheduledExecuterService;
+
+    public PeriodicalTask(ScheduledExecutorService scheduledExecutorService) {
+      this.tasksInFlight = new LinkedList<>();
+      this.isShutdown = BackgroundService.this.isShutdown;
+      this.scheduledExecuterService = scheduledExecutorService;
+    }
+
+    private PeriodicalTask(PeriodicalTask other) {
+      this.tasksInFlight = other.tasksInFlight;
+      this.isShutdown = other.isShutdown;
+      this.scheduledExecuterService = other.scheduledExecuterService;
+    }
+
+    private boolean performIfNotShutdown(Runnable runnable) {
+      return isShutdown.updateAndGet((shutdown) -> {
+        if (!shutdown) {
+          runnable.run();
+        }
+        return shutdown;
+      });
+    }
 
+    private <T> boolean performIfNotShutdown(Consumer<T> consumer, T t) {
+      return isShutdown.updateAndGet((shutdown) -> {
+        if (!shutdown) {
+          consumer.accept(t);
+        }
+        return shutdown;
+      });
+    }
+
+    private boolean runTasks() {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Running background service : {}", serviceName);
       }
-      BackgroundTaskQueue tasks = getTasks();
+      if (isShutdown.get()) {
+        return false;
+      }
+      if (!tasksInFlight.isEmpty()) {
+        LOG.warn("Tasks are still in flight service {}. This should not happen 
schedule should only begin once all " +
+            "tasks from schedules have completed execution.", serviceName);
+        tasksInFlight.clear();
+      }
+
+      BackgroundTaskQueue tasks = getTasks(true);
       if (tasks.isEmpty()) {
         // No task found, or some problems to init tasks
         // return and retry in next interval.
-        return;
+        return false;
       }
       if (LOG.isDebugEnabled()) {
         LOG.debug("Number of background tasks to execute : {}", tasks.size());
       }
-      synchronized (BackgroundService.this) {
-        while (!tasks.isEmpty()) {
-          BackgroundTask task = tasks.poll();
-          future = future.thenCombine(CompletableFuture.runAsync(() -> {
-            long startTime = System.nanoTime();
-            try {
-              BackgroundTaskResult result = task.call();
-              if (LOG.isDebugEnabled()) {
-                LOG.debug("task execution result size {}", result.getSize());
-              }
-            } catch (Throwable e) {
-              LOG.error("Background task execution failed", e);
-              if (e instanceof Error) {
-                throw (Error) e;
-              }
-            } finally {
-              long endTime = System.nanoTime();
-              if (endTime - startTime > serviceTimeoutInNanos) {
-                LOG.warn("{} Background task execution took {}ns > 
{}ns(timeout)",
-                    serviceName, endTime - startTime, serviceTimeoutInNanos);
-              }
-            }
-          }, exec).exceptionally(e -> null), (Void1, Void) -> null);
+      Consumer<BackgroundTaskForkJoin> taskForkHandler = task -> {
+        task.fork();
+        tasksInFlight.offer(task);

Review Comment:
   Method accept ignores exceptional return value of 
Queue<BackgroundTaskForkJoin>.offer.
   ```suggestion
           if (!tasksInFlight.offer(task)) {
             LOG.error("Failed to enqueue background task for service {}. Task 
will not be tracked.", serviceName);
           }
   ```



##########
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundTaskForkJoin.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.utils;
+
+import java.util.concurrent.RecursiveTask;
+
+/**
+ * A ForkJoin wrapper for {@link BackgroundTask} that enables parallel 
execution
+ * in a ForkJoinPool while keeping the BackgroundTask interface simple.
+ * 
+ * <p>This wrapper handles the RecursiveTask mechanics, timing, and exception
+ * handling, allowing BackgroundTask implementations to focus on their 
business logic.
+ */
+public class BackgroundTaskForkJoin extends 
RecursiveTask<BackgroundTaskForkJoin.BackgroundTaskForkResult> {
+  private static final long serialVersionUID = 1L;
+  private final transient BackgroundTask backgroundTask;
+
+  public BackgroundTaskForkJoin(BackgroundTask backgroundTask) {
+    this.backgroundTask = backgroundTask;
+  }
+
+  /**
+   * Result wrapper containing the task result, execution time, and any 
exception.
+   */
+  public static final class BackgroundTaskForkResult {
+    private final BackgroundTaskResult result;
+    private final Throwable throwable;
+    private final long startTime;
+    private final long endTime;
+
+    private BackgroundTaskForkResult(BackgroundTaskResult result, long 
startTime, long endTime, Throwable throwable) {
+      this.endTime = endTime;
+      this.result = result;
+      this.startTime = startTime;
+      this.throwable = throwable;
+    }
+
+    public long getTotalExecutionTime() {
+      return endTime - startTime;
+    }
+
+    public BackgroundTaskResult getResult() {
+      return result;
+    }
+
+    public Throwable getThrowable() {
+      return throwable;
+    }
+  }
+
+  @Override
+  protected BackgroundTaskForkResult compute() {
+    long startTime = System.nanoTime();
+    BackgroundTaskResult result = null;
+    Throwable throwable = null;
+    try {
+      result = backgroundTask.call();
+    } catch (Throwable e) {
+      throwable = e;

Review Comment:
   `BackgroundTaskForkJoin.compute()` catches all `Throwable` and only stores 
it in the result, which means `Error`s (eg `OutOfMemoryError`) will be 
swallowed and the pool will keep running. The previous BackgroundService 
implementation explicitly rethrew `Error`. Consider rethrowing `Error` after 
capturing/logging (or letting it propagate) so fatal JVM conditions aren’t 
masked.
   ```suggestion
         throwable = e;
         if (e instanceof Error) {
           throw (Error) e;
         }
   ```



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SstFilteringService.java:
##########
@@ -255,6 +255,9 @@ public BackgroundTaskResult call() throws Exception {
                       .getSnapshotId());
                 }
               }
+            } catch (InterruptedException e) {
+              Thread.currentThread().interrupt();
+              LOG.error("SST filtering task interrupted for snapshot: {}", 
snapShotTableKey, e);

Review Comment:
   After catching `InterruptedException`, the code re-interrupts the thread but 
continues processing subsequent snapshots. Typically interruption should stop 
the task (e.g., break the loop / return) to allow the service to shut down 
promptly and avoid doing more work on an interrupted thread.
   ```suggestion
                 LOG.error("SST filtering task interrupted for snapshot: {}", 
snapShotTableKey, e);
                 break;
   ```



##########
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java:
##########
@@ -138,84 +142,183 @@ protected void execTaskCompletion() { }
    * Run one or more background tasks concurrently.
    * Wait until all tasks to return the result.
    */
-  public class PeriodicalTask implements Runnable {
-    @Override
-    public void run() {
-      // wait for previous set of tasks to complete
-      try {
-        future.join();
-      } catch (RuntimeException e) {
-        LOG.error("Background service execution failed.", e);
-      } finally {
-        execTaskCompletion();
-      }
+  public class PeriodicalTask extends RecursiveAction {
+    private final Queue<BackgroundTaskForkJoin> tasksInFlight;
+    private final AtomicReference<Boolean> isShutdown;
+    private final ScheduledExecutorService scheduledExecuterService;
+
+    public PeriodicalTask(ScheduledExecutorService scheduledExecutorService) {
+      this.tasksInFlight = new LinkedList<>();
+      this.isShutdown = BackgroundService.this.isShutdown;
+      this.scheduledExecuterService = scheduledExecutorService;
+    }
+
+    private PeriodicalTask(PeriodicalTask other) {
+      this.tasksInFlight = other.tasksInFlight;
+      this.isShutdown = other.isShutdown;
+      this.scheduledExecuterService = other.scheduledExecuterService;
+    }
+
+    private boolean performIfNotShutdown(Runnable runnable) {
+      return isShutdown.updateAndGet((shutdown) -> {
+        if (!shutdown) {
+          runnable.run();
+        }
+        return shutdown;
+      });
+    }
 
+    private <T> boolean performIfNotShutdown(Consumer<T> consumer, T t) {
+      return isShutdown.updateAndGet((shutdown) -> {
+        if (!shutdown) {
+          consumer.accept(t);
+        }
+        return shutdown;
+      });
+    }
+
+    private boolean runTasks() {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Running background service : {}", serviceName);
       }
-      BackgroundTaskQueue tasks = getTasks();
+      if (isShutdown.get()) {
+        return false;
+      }
+      if (!tasksInFlight.isEmpty()) {
+        LOG.warn("Tasks are still in flight service {}. This should not happen 
schedule should only begin once all " +
+            "tasks from schedules have completed execution.", serviceName);
+        tasksInFlight.clear();
+      }
+
+      BackgroundTaskQueue tasks = getTasks(true);
       if (tasks.isEmpty()) {
         // No task found, or some problems to init tasks
         // return and retry in next interval.
-        return;
+        return false;
       }
       if (LOG.isDebugEnabled()) {
         LOG.debug("Number of background tasks to execute : {}", tasks.size());
       }
-      synchronized (BackgroundService.this) {
-        while (!tasks.isEmpty()) {
-          BackgroundTask task = tasks.poll();
-          future = future.thenCombine(CompletableFuture.runAsync(() -> {
-            long startTime = System.nanoTime();
-            try {
-              BackgroundTaskResult result = task.call();
-              if (LOG.isDebugEnabled()) {
-                LOG.debug("task execution result size {}", result.getSize());
-              }
-            } catch (Throwable e) {
-              LOG.error("Background task execution failed", e);
-              if (e instanceof Error) {
-                throw (Error) e;
-              }
-            } finally {
-              long endTime = System.nanoTime();
-              if (endTime - startTime > serviceTimeoutInNanos) {
-                LOG.warn("{} Background task execution took {}ns > 
{}ns(timeout)",
-                    serviceName, endTime - startTime, serviceTimeoutInNanos);
-              }
-            }
-          }, exec).exceptionally(e -> null), (Void1, Void) -> null);
+      Consumer<BackgroundTaskForkJoin> taskForkHandler = task -> {
+        task.fork();
+        tasksInFlight.offer(task);
+      };
+      while (!tasks.isEmpty()) {
+        BackgroundTask task = tasks.poll();
+        // Wrap the task in a ForkJoin wrapper and fork it.
+        BackgroundTaskForkJoin forkJoinTask = new BackgroundTaskForkJoin(task);
+        if (performIfNotShutdown(taskForkHandler, forkJoinTask)) {
+          return false;
+        }
+      }
+      Consumer<BackgroundTaskForkJoin> taskCompletionHandler = task -> {
+        BackgroundTaskForkJoin.BackgroundTaskForkResult result = task.join();
+        // Check for exception first in the task execution.
+        if (result.getThrowable() != null) {
+          LOG.error("Background task execution failed", result.getThrowable());
+        } else {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("task execution result size {}", 
result.getResult().getSize());
+          }
+        }
+        if (result.getTotalExecutionTime() > serviceTimeoutInNanos) {
+          LOG.warn("{} Background task execution took {}ns > {}ns(timeout)",
+              serviceName, result.getTotalExecutionTime(), 
serviceTimeoutInNanos);
+        }
+      };
+      while (!tasksInFlight.isEmpty()) {
+        BackgroundTaskForkJoin taskInFlight = tasksInFlight.poll();
+        // Join the tasks forked before and wait for the result one by one.
+        if (performIfNotShutdown(taskCompletionHandler, taskInFlight)) {
+          return false;
         }
       }
+      return true;
+    }
+
+    private void scheduleNextTask() {
+      performIfNotShutdown(() -> {
+        if (scheduledExecuterService != null) {
+          scheduledExecuterService.schedule(() -> exec.submit(new 
PeriodicalTask(this)),
+              intervalInMillis, TimeUnit.MILLISECONDS);
+        }
+      });
+    }
+
+    @Override
+    public void compute() {
+      future = new CompletableFuture<>();
+      if (runTasks()) {
+        scheduleNextTask();
+      } else {
+        LOG.debug("Service {} is shutdown. Cancelling all schedules of all 
tasks.", serviceName);
+      }
+      future.complete(null);
     }
   }
 
   // shutdown and make sure all threads are properly released.
-  public synchronized void shutdown() {
+  public void shutdown() {
     LOG.info("Shutting down service {}", this.serviceName);
-    exec.shutdown();
-    try {
-      if (!exec.awaitTermination(60, TimeUnit.SECONDS)) {
-        exec.shutdownNow();
+    final ThreadGroup threadGroupToBeClosed;
+    final ForkJoinPool execToShutdown;
+    final UncheckedAutoCloseableSupplier<ScheduledExecutorService> 
periodicTaskSchedulerToBeClosed;
+    // Set the shutdown flag to true to prevent new tasks from being submitted.
+    synchronized (this) {
+      periodicTaskSchedulerToBeClosed = periodicTaskScheduler;
+      threadGroupToBeClosed = threadGroup;
+      execToShutdown = exec;
+      exec = null;
+      threadGroup = null;
+      periodicTaskScheduler = null;
+      if (isShutdown != null) {
+        this.isShutdown.set(true);
+      }
+      isShutdown = null;
+    }
+    if (execToShutdown != null) {
+      execToShutdown.shutdown();
+      try {
+        if (!execToShutdown.awaitTermination(60, TimeUnit.SECONDS)) {
+          execToShutdown.shutdownNow();
+        }
+      } catch (InterruptedException e) {
+        // Re-interrupt the thread while catching InterruptedException
+        Thread.currentThread().interrupt();
+        execToShutdown.shutdownNow();
       }
-    } catch (InterruptedException e) {
-      // Re-interrupt the thread while catching InterruptedException
-      Thread.currentThread().interrupt();
-      exec.shutdownNow();
     }
-    if (threadGroup.activeCount() == 0 && !threadGroup.isDestroyed()) {
-      threadGroup.destroy();
+    if (periodicTaskSchedulerToBeClosed != null) {
+      periodicTaskSchedulerToBeClosed.close();
+    }
+    if (threadGroupToBeClosed != null && !threadGroupToBeClosed.isDestroyed()) 
{
+      threadGroupToBeClosed.destroy();
     }
   }
 
-  private void initExecutorAndThreadGroup() {
-    threadGroup = new ThreadGroup(serviceName);
-    ThreadFactory threadFactory = new ThreadFactoryBuilder()
-        .setThreadFactory(r -> new Thread(threadGroup, r))
-        .setDaemon(true)
-        .setNameFormat(threadNamePrefix + serviceName + "#%d")
-        .build();
-    exec = (ScheduledThreadPoolExecutor) 
Executors.newScheduledThreadPool(threadPoolSize, threadFactory);
+  private synchronized void initExecutorAndThreadGroup() {
+    try {
+      threadGroup = new ThreadGroup(serviceName);
+      Thread initThread = new Thread(threadGroup, () -> {
+        ForkJoinPool.ForkJoinWorkerThreadFactory factory =
+            pool -> {
+              ForkJoinWorkerThread thread = new ForkJoinWorkerThread(pool) {
+              };
+              thread.setDaemon(true);
+              thread.setName(threadNamePrefix + serviceName + 
thread.getPoolIndex());
+              return thread;
+            };
+        exec = new ForkJoinPool(threadPoolSize, factory, null, false);
+        isShutdown = new AtomicReference<>(false);

Review Comment:
   Background tasks frequently do blocking I/O (eg OM Ratis `submitRequest`, 
RocksDB calls). Using a `ForkJoinPool` for blocking work can reduce parallelism 
and hurt throughput unless blocking sections use 
`ForkJoinPool.managedBlock(...)` (or a dedicated blocking pool is used). 
Consider addressing blocking sections or documenting why ForkJoinPool is safe 
here.



##########
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java:
##########
@@ -138,84 +142,183 @@ protected void execTaskCompletion() { }
    * Run one or more background tasks concurrently.
    * Wait until all tasks to return the result.
    */
-  public class PeriodicalTask implements Runnable {
-    @Override
-    public void run() {
-      // wait for previous set of tasks to complete
-      try {
-        future.join();
-      } catch (RuntimeException e) {
-        LOG.error("Background service execution failed.", e);
-      } finally {
-        execTaskCompletion();
-      }
+  public class PeriodicalTask extends RecursiveAction {
+    private final Queue<BackgroundTaskForkJoin> tasksInFlight;
+    private final AtomicReference<Boolean> isShutdown;
+    private final ScheduledExecutorService scheduledExecuterService;
+
+    public PeriodicalTask(ScheduledExecutorService scheduledExecutorService) {
+      this.tasksInFlight = new LinkedList<>();
+      this.isShutdown = BackgroundService.this.isShutdown;
+      this.scheduledExecuterService = scheduledExecutorService;
+    }

Review Comment:
   `PeriodicalTask` no longer has a no-arg constructor. There are existing call 
sites that still use `new PeriodicalTask()` (e.g., 
`hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerServiceTestImpl.java:68`),
 which will fail compilation. Either restore a no-arg constructor delegating to 
`this(null)` or update all call sites.



##########
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundServiceScheduler.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.utils;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import org.apache.ratis.util.ReferenceCountedObject;
+import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier;
+
+/**
+ * Utility class to manage a shared background service using a {@link 
ScheduledExecutorService}
+ * which is provided with a single-threaded {@link 
ScheduledThreadPoolExecutor}.
+ * This class manages the lifecycle and reference counting for the executor
+ * to ensure proper resource cleanup.
+ *
+ * The executor is lazily initialized on the first invocation of the {@code 
get()} method.
+ * It is shut down and released when no longer referenced, ensuring efficient 
use
+ * of system resources. The shutdown process includes cleaning the reference 
to the executor.
+ *
+ * This class is thread-safe.
+ */
+final class BackgroundServiceScheduler {
+  private static ReferenceCountedObject<ScheduledExecutorService> executor;
+
+  private BackgroundServiceScheduler() {
+
+  }
+
+  public static synchronized 
UncheckedAutoCloseableSupplier<ScheduledExecutorService> get() {
+    if (executor == null) {
+      ScheduledThreadPoolExecutor scheduler = new 
ScheduledThreadPoolExecutor(1);
+      executor = ReferenceCountedObject.wrap(scheduler, () -> { }, (shutdown) 
-> {

Review Comment:
   `BackgroundServiceScheduler` uses the default thread factory for 
`ScheduledThreadPoolExecutor`, which creates non-daemon threads. Since this is 
a shared background scheduler, a non-daemon thread can keep the JVM alive if 
something forgets to close/release it. Consider using a daemon thread factory 
(and naming the thread) for the scheduler.



##########
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java:
##########
@@ -138,84 +142,183 @@ protected void execTaskCompletion() { }
    * Run one or more background tasks concurrently.
    * Wait until all tasks to return the result.
    */
-  public class PeriodicalTask implements Runnable {
-    @Override
-    public void run() {
-      // wait for previous set of tasks to complete
-      try {
-        future.join();
-      } catch (RuntimeException e) {
-        LOG.error("Background service execution failed.", e);
-      } finally {
-        execTaskCompletion();
-      }
+  public class PeriodicalTask extends RecursiveAction {
+    private final Queue<BackgroundTaskForkJoin> tasksInFlight;
+    private final AtomicReference<Boolean> isShutdown;
+    private final ScheduledExecutorService scheduledExecuterService;
+
+    public PeriodicalTask(ScheduledExecutorService scheduledExecutorService) {
+      this.tasksInFlight = new LinkedList<>();
+      this.isShutdown = BackgroundService.this.isShutdown;
+      this.scheduledExecuterService = scheduledExecutorService;
+    }
+
+    private PeriodicalTask(PeriodicalTask other) {
+      this.tasksInFlight = other.tasksInFlight;
+      this.isShutdown = other.isShutdown;
+      this.scheduledExecuterService = other.scheduledExecuterService;
+    }
+
+    private boolean performIfNotShutdown(Runnable runnable) {
+      return isShutdown.updateAndGet((shutdown) -> {
+        if (!shutdown) {
+          runnable.run();
+        }
+        return shutdown;
+      });
+    }
 
+    private <T> boolean performIfNotShutdown(Consumer<T> consumer, T t) {
+      return isShutdown.updateAndGet((shutdown) -> {
+        if (!shutdown) {
+          consumer.accept(t);
+        }
+        return shutdown;
+      });
+    }
+
+    private boolean runTasks() {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Running background service : {}", serviceName);
       }
-      BackgroundTaskQueue tasks = getTasks();
+      if (isShutdown.get()) {
+        return false;
+      }
+      if (!tasksInFlight.isEmpty()) {
+        LOG.warn("Tasks are still in flight service {}. This should not happen 
schedule should only begin once all " +
+            "tasks from schedules have completed execution.", serviceName);
+        tasksInFlight.clear();
+      }
+
+      BackgroundTaskQueue tasks = getTasks(true);
       if (tasks.isEmpty()) {
         // No task found, or some problems to init tasks
         // return and retry in next interval.
-        return;
+        return false;

Review Comment:
   `runTasks()` returns `false` when `getTasks(true)` is empty, which stops 
future scheduling even though the comment says it should “retry in next 
interval”. This changes the `BackgroundService` contract and can permanently 
halt services on transient task-creation failures. Consider scheduling the next 
run even when the queue is empty (and adjust logging accordingly).



##########
hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingService.java:
##########
@@ -226,8 +227,8 @@ public void testMultithreadedDirectoryDeletion() throws 
Exception {
             return future;
           });
       ozoneManager.getKeyManager().getDirDeletingService().suspend();
-      DirectoryDeletingService.DirDeletingTask dirDeletingTask =
-          ozoneManager.getKeyManager().getDirDeletingService().new 
DirDeletingTask(null);
+      DirDeletingTask dirDeletingTask = new DirDeletingTask(null, false,
+          ozoneManager.getKeyManager().getDirDeletingService());
 
       dirDeletingTask.processDeletedDirsForStore(null, 
ozoneManager.getKeyManager(), 1, 6000);

Review Comment:
   `testMultithreadedDirectoryDeletion` still mocks 
`CompletableFuture.supplyAsync` and asserts `futureList` has `threadCount` 
entries, but `DirDeletingTask.processDeletedDirsForStore(...)` no longer uses 
`CompletableFuture` (and is invoked with `allowForks=false`), so `futureList` 
will stay empty and the assertion will fail. Update this test to validate the 
new ForkJoin-based parallelism (or adjust it to the new execution path).



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java:
##########
@@ -738,38 +757,38 @@ startTime, getOzoneManager().getKeyManager(),
     public BackgroundTaskResult call() {
       // Check if this is the Leader OM. If not leader, no need to execute this
       // task.
-      if (shouldRun()) {
-        final long run = getRunCount().incrementAndGet();
+      if (dds.shouldRun()) {
+        final long run = dds.getRunCount().incrementAndGet();
         if (snapshotId == null) {
           LOG.debug("Running DirectoryDeletingService for active object store, 
{}", run);
         } else {
           LOG.debug("Running DirectoryDeletingService for snapshot : {}, {}", 
snapshotId, run);
         }
-        OmSnapshotManager omSnapshotManager = 
getOzoneManager().getOmSnapshotManager();
+        OmSnapshotManager omSnapshotManager = 
dds.getOzoneManager().getOmSnapshotManager();
         SnapshotInfo snapInfo = null;
         try {
           snapInfo = snapshotId == null ? null :
-              SnapshotUtils.getSnapshotInfo(getOzoneManager(), 
snapshotChainManager, snapshotId);
+              SnapshotUtils.getSnapshotInfo(dds.getOzoneManager(), 
dds.snapshotChainManager, snapshotId);
           if (snapInfo != null) {
             if (snapInfo.isDeepCleanedDeletedDir()) {
               LOG.info("Snapshot {} has already been deep cleaned directory. 
Skipping the snapshot in this iteration.",
                   snapInfo.getSnapshotId());
               return BackgroundTaskResult.EmptyTaskResult.newResult();
             }
-            if 
(!OmSnapshotManager.areSnapshotChangesFlushedToDB(getOzoneManager().getMetadataManager(),
 snapInfo)) {
+            if 
(!areSnapshotChangesFlushedToDB(dds.getOzoneManager().getMetadataManager(), 
snapInfo)) {
               LOG.info("Skipping snapshot processing since changes to snapshot 
{} have not been flushed to disk",
                   snapInfo);
               return BackgroundTaskResult.EmptyTaskResult.newResult();
             }
-          } else if (!isPreviousPurgeTransactionFlushed()) {
+          } else if (!dds.isPreviousPurgeTransactionFlushed()) {
             return BackgroundTaskResult.EmptyTaskResult.newResult();
           }
           try (UncheckedAutoCloseableSupplier<OmSnapshot> omSnapshot = 
snapInfo == null ? null :
               omSnapshotManager.getActiveSnapshot(snapInfo.getVolumeName(), 
snapInfo.getBucketName(),
                   snapInfo.getName())) {
-            KeyManager keyManager = snapInfo == null ? 
getOzoneManager().getKeyManager()
+            KeyManager keyManager = snapInfo == null ? 
dds.getOzoneManager().getKeyManager()

Review Comment:
   Access of [element](1) annotated with VisibleForTesting found in production 
code.



##########
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java:
##########
@@ -138,84 +142,183 @@ protected void execTaskCompletion() { }
    * Run one or more background tasks concurrently.
    * Wait until all tasks to return the result.
    */
-  public class PeriodicalTask implements Runnable {
-    @Override
-    public void run() {
-      // wait for previous set of tasks to complete
-      try {
-        future.join();
-      } catch (RuntimeException e) {
-        LOG.error("Background service execution failed.", e);
-      } finally {
-        execTaskCompletion();
-      }
+  public class PeriodicalTask extends RecursiveAction {
+    private final Queue<BackgroundTaskForkJoin> tasksInFlight;
+    private final AtomicReference<Boolean> isShutdown;
+    private final ScheduledExecutorService scheduledExecuterService;
+
+    public PeriodicalTask(ScheduledExecutorService scheduledExecutorService) {
+      this.tasksInFlight = new LinkedList<>();
+      this.isShutdown = BackgroundService.this.isShutdown;
+      this.scheduledExecuterService = scheduledExecutorService;
+    }
+
+    private PeriodicalTask(PeriodicalTask other) {
+      this.tasksInFlight = other.tasksInFlight;
+      this.isShutdown = other.isShutdown;
+      this.scheduledExecuterService = other.scheduledExecuterService;

Review Comment:
   Field name `scheduledExecuterService` is misspelled ("Executer" vs 
"Executor"). Renaming to `scheduledExecutorService` would improve readability 
and avoid propagating the typo to other code.
   ```suggestion
       private final ScheduledExecutorService scheduledExecutorService;
   
       public PeriodicalTask(ScheduledExecutorService scheduledExecutorService) 
{
         this.tasksInFlight = new LinkedList<>();
         this.isShutdown = BackgroundService.this.isShutdown;
         this.scheduledExecutorService = scheduledExecutorService;
       }
   
       private PeriodicalTask(PeriodicalTask other) {
         this.tasksInFlight = other.tasksInFlight;
         this.isShutdown = other.isShutdown;
         this.scheduledExecutorService = other.scheduledExecutorService;
   ```



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java:
##########
@@ -705,9 +724,9 @@ private boolean processDeletedDirectories(SnapshotInfo 
currentSnapshotInfo, KeyM
           subFileNum += purgePathRequest.getDeletedSubFilesCount();
         }
 
-        optimizeDirDeletesAndSubmitRequest(dirNum, subDirNum,
+        dds.optimizeDirDeletesAndSubmitRequest(dirNum, subDirNum,
             subFileNum, allSubDirList, purgePathRequestList, snapshotTableKey,
-            startTime, getOzoneManager().getKeyManager(),
+            startTime, dds.getOzoneManager().getKeyManager(),

Review Comment:
   Access of [element](1) annotated with VisibleForTesting found in production 
code.
   ```suggestion
               startTime, keyManager,
   ```



##########
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java:
##########
@@ -138,84 +142,183 @@ protected void execTaskCompletion() { }
    * Run one or more background tasks concurrently.
    * Wait until all tasks to return the result.
    */
-  public class PeriodicalTask implements Runnable {
-    @Override
-    public void run() {
-      // wait for previous set of tasks to complete
-      try {
-        future.join();
-      } catch (RuntimeException e) {
-        LOG.error("Background service execution failed.", e);
-      } finally {
-        execTaskCompletion();
-      }
+  public class PeriodicalTask extends RecursiveAction {
+    private final Queue<BackgroundTaskForkJoin> tasksInFlight;
+    private final AtomicReference<Boolean> isShutdown;
+    private final ScheduledExecutorService scheduledExecuterService;
+
+    public PeriodicalTask(ScheduledExecutorService scheduledExecutorService) {
+      this.tasksInFlight = new LinkedList<>();
+      this.isShutdown = BackgroundService.this.isShutdown;
+      this.scheduledExecuterService = scheduledExecutorService;
+    }
+
+    private PeriodicalTask(PeriodicalTask other) {
+      this.tasksInFlight = other.tasksInFlight;
+      this.isShutdown = other.isShutdown;
+      this.scheduledExecuterService = other.scheduledExecuterService;
+    }
+
+    private boolean performIfNotShutdown(Runnable runnable) {
+      return isShutdown.updateAndGet((shutdown) -> {
+        if (!shutdown) {
+          runnable.run();
+        }
+        return shutdown;
+      });
+    }
 
+    private <T> boolean performIfNotShutdown(Consumer<T> consumer, T t) {
+      return isShutdown.updateAndGet((shutdown) -> {
+        if (!shutdown) {
+          consumer.accept(t);
+        }
+        return shutdown;
+      });
+    }
+
+    private boolean runTasks() {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Running background service : {}", serviceName);
       }
-      BackgroundTaskQueue tasks = getTasks();
+      if (isShutdown.get()) {
+        return false;
+      }
+      if (!tasksInFlight.isEmpty()) {
+        LOG.warn("Tasks are still in flight service {}. This should not happen 
schedule should only begin once all " +
+            "tasks from schedules have completed execution.", serviceName);
+        tasksInFlight.clear();
+      }
+
+      BackgroundTaskQueue tasks = getTasks(true);
       if (tasks.isEmpty()) {
         // No task found, or some problems to init tasks
         // return and retry in next interval.
-        return;
+        return false;
       }
       if (LOG.isDebugEnabled()) {
         LOG.debug("Number of background tasks to execute : {}", tasks.size());
       }
-      synchronized (BackgroundService.this) {
-        while (!tasks.isEmpty()) {
-          BackgroundTask task = tasks.poll();
-          future = future.thenCombine(CompletableFuture.runAsync(() -> {
-            long startTime = System.nanoTime();
-            try {
-              BackgroundTaskResult result = task.call();
-              if (LOG.isDebugEnabled()) {
-                LOG.debug("task execution result size {}", result.getSize());
-              }
-            } catch (Throwable e) {
-              LOG.error("Background task execution failed", e);
-              if (e instanceof Error) {
-                throw (Error) e;
-              }
-            } finally {
-              long endTime = System.nanoTime();
-              if (endTime - startTime > serviceTimeoutInNanos) {
-                LOG.warn("{} Background task execution took {}ns > 
{}ns(timeout)",
-                    serviceName, endTime - startTime, serviceTimeoutInNanos);
-              }
-            }
-          }, exec).exceptionally(e -> null), (Void1, Void) -> null);
+      Consumer<BackgroundTaskForkJoin> taskForkHandler = task -> {
+        task.fork();
+        tasksInFlight.offer(task);
+      };
+      while (!tasks.isEmpty()) {
+        BackgroundTask task = tasks.poll();
+        // Wrap the task in a ForkJoin wrapper and fork it.
+        BackgroundTaskForkJoin forkJoinTask = new BackgroundTaskForkJoin(task);
+        if (performIfNotShutdown(taskForkHandler, forkJoinTask)) {
+          return false;
+        }
+      }
+      Consumer<BackgroundTaskForkJoin> taskCompletionHandler = task -> {
+        BackgroundTaskForkJoin.BackgroundTaskForkResult result = task.join();
+        // Check for exception first in the task execution.
+        if (result.getThrowable() != null) {
+          LOG.error("Background task execution failed", result.getThrowable());
+        } else {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("task execution result size {}", 
result.getResult().getSize());
+          }
+        }
+        if (result.getTotalExecutionTime() > serviceTimeoutInNanos) {
+          LOG.warn("{} Background task execution took {}ns > {}ns(timeout)",
+              serviceName, result.getTotalExecutionTime(), 
serviceTimeoutInNanos);
+        }
+      };
+      while (!tasksInFlight.isEmpty()) {
+        BackgroundTaskForkJoin taskInFlight = tasksInFlight.poll();
+        // Join the tasks forked before and wait for the result one by one.
+        if (performIfNotShutdown(taskCompletionHandler, taskInFlight)) {
+          return false;
         }
       }
+      return true;
+    }
+
+    private void scheduleNextTask() {
+      performIfNotShutdown(() -> {
+        if (scheduledExecuterService != null) {
+          scheduledExecuterService.schedule(() -> exec.submit(new 
PeriodicalTask(this)),
+              intervalInMillis, TimeUnit.MILLISECONDS);
+        }
+      });
+    }
+
+    @Override
+    public void compute() {
+      future = new CompletableFuture<>();
+      if (runTasks()) {
+        scheduleNextTask();
+      } else {
+        LOG.debug("Service {} is shutdown. Cancelling all schedules of all 
tasks.", serviceName);
+      }
+      future.complete(null);
     }
   }
 
   // shutdown and make sure all threads are properly released.
-  public synchronized void shutdown() {
+  public void shutdown() {
     LOG.info("Shutting down service {}", this.serviceName);
-    exec.shutdown();
-    try {
-      if (!exec.awaitTermination(60, TimeUnit.SECONDS)) {
-        exec.shutdownNow();
+    final ThreadGroup threadGroupToBeClosed;
+    final ForkJoinPool execToShutdown;
+    final UncheckedAutoCloseableSupplier<ScheduledExecutorService> 
periodicTaskSchedulerToBeClosed;
+    // Set the shutdown flag to true to prevent new tasks from being submitted.
+    synchronized (this) {
+      periodicTaskSchedulerToBeClosed = periodicTaskScheduler;
+      threadGroupToBeClosed = threadGroup;
+      execToShutdown = exec;
+      exec = null;
+      threadGroup = null;
+      periodicTaskScheduler = null;
+      if (isShutdown != null) {
+        this.isShutdown.set(true);
+      }
+      isShutdown = null;
+    }
+    if (execToShutdown != null) {
+      execToShutdown.shutdown();
+      try {
+        if (!execToShutdown.awaitTermination(60, TimeUnit.SECONDS)) {
+          execToShutdown.shutdownNow();
+        }
+      } catch (InterruptedException e) {
+        // Re-interrupt the thread while catching InterruptedException
+        Thread.currentThread().interrupt();
+        execToShutdown.shutdownNow();
       }
-    } catch (InterruptedException e) {
-      // Re-interrupt the thread while catching InterruptedException
-      Thread.currentThread().interrupt();
-      exec.shutdownNow();
     }
-    if (threadGroup.activeCount() == 0 && !threadGroup.isDestroyed()) {
-      threadGroup.destroy();
+    if (periodicTaskSchedulerToBeClosed != null) {
+      periodicTaskSchedulerToBeClosed.close();
+    }
+    if (threadGroupToBeClosed != null && !threadGroupToBeClosed.isDestroyed()) 
{
+      threadGroupToBeClosed.destroy();

Review Comment:
   `shutdown()` calls `threadGroupToBeClosed.destroy()` without ensuring the 
group has no active threads. `ThreadGroup.destroy()` throws 
`IllegalThreadStateException` if the group still has active threads, which can 
make shutdown fail unexpectedly. Consider checking `activeCount()==0` (as 
before), or catching/handling the exception and/or waiting longer for worker 
termination before destroying the group.
   ```suggestion
         try {
           int activeCount = threadGroupToBeClosed.activeCount();
           if (activeCount == 0) {
             threadGroupToBeClosed.destroy();
           } else {
             LOG.warn("Skipping destroy of thread group {} as it still has {} 
active threads",
                 threadGroupToBeClosed.getName(), activeCount);
           }
         } catch (IllegalThreadStateException e) {
           LOG.warn("Failed to destroy thread group {} due to active threads",
               threadGroupToBeClosed.getName(), e);
         }
   ```



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java:
##########
@@ -688,10 +707,10 @@ private boolean processDeletedDirectories(SnapshotInfo 
currentSnapshotInfo, KeyM
               .build());
 
           boolean isDirReclaimable = 
reclaimableDirFilter.apply(pendingDeletedDirInfo);
-          Optional<PurgePathRequest> request = prepareDeleteDirRequest(
+          Optional<PurgePathRequest> request = dds.prepareDeleteDirRequest(
               pendingDeletedDirInfo.getValue(),
               pendingDeletedDirInfo.getKey(), isDirReclaimable, allSubDirList,
-              getOzoneManager().getKeyManager(), reclaimableFileFilter, 
remainNum);
+              dds.getOzoneManager().getKeyManager(), reclaimableFileFilter, 
remainNum);

Review Comment:
   Access of [element](1) annotated with VisibleForTesting found in production 
code.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java:
##########
@@ -596,26 +587,54 @@ void processDeletedDirsForStore(SnapshotInfo 
currentSnapshotInfo, KeyManager key
         // This is to avoid race condition b/w purge request and snapshot 
chain update. For AOS taking the global
         // snapshotId since AOS could process multiple buckets in one 
iteration. While using path
         // previous snapshotId for a snapshot since it would process only one 
bucket.
+        SnapshotChainManager snapshotChainManager = dds.snapshotChainManager;
         UUID expectedPreviousSnapshotId = currentSnapshotInfo == null ?
             snapshotChainManager.getLatestGlobalSnapshotId() :
             SnapshotUtils.getPreviousSnapshotId(currentSnapshotInfo, 
snapshotChainManager);
         Map<UUID, Pair<Long, Long>> exclusiveSizeMap = Maps.newConcurrentMap();
 
-        CompletableFuture<Boolean> processedAllDeletedDirs = 
CompletableFuture.completedFuture(true);
-        for (int i = 0; i < numberOfParallelThreadsPerStore; i++) {
-          CompletableFuture<Boolean> future = CompletableFuture.supplyAsync(() 
-> {
-            try {
-              return processDeletedDirectories(currentSnapshotInfo, 
keyManager, dirSupplier,
-                  expectedPreviousSnapshotId, exclusiveSizeMap, rnCnt, 
remainNum);
-            } catch (Throwable e) {
-              return false;
-            }
-          }, isThreadPoolActive(deletionThreadPool) ? deletionThreadPool : 
ForkJoinPool.commonPool());
-          processedAllDeletedDirs = 
processedAllDeletedDirs.thenCombine(future, (a, b) -> a && b);
+        boolean processedAllDeletedDirs;
+        int maxForksPerStore = dds.maxForksPerStore;
+        // If allowed to fork, create multiple tasks to process deleted 
directories tasks in parallel.
+        if (allowForks) {
+          Queue<RecursiveTask<Boolean>> recursiveTasks = new LinkedList<>();
+          processedAllDeletedDirs = true;
+          for (int i = 0; i < maxForksPerStore; i++) {
+            RecursiveTask<Boolean> task = new RecursiveTask<Boolean>() {
+              private static final long serialVersionUID = 1L;
+              private final transient SnapshotInfo snapshotInfo = 
currentSnapshotInfo;
+              private final transient DeletedDirSupplier deletedDirSupplier = 
dirSupplier;
+              private final transient KeyManager km = keyManager;
+
+              @Override
+              protected Boolean compute() {
+                try {
+                  return processDeletedDirectories(snapshotInfo, km, 
deletedDirSupplier,
+                      expectedPreviousSnapshotId, exclusiveSizeMap, rnCnt, 
remainNum);
+                } catch (Throwable e) {
+                  return false;
+                }
+              }
+            };
+            task.fork();
+            recursiveTasks.offer(task);

Review Comment:
   Method processDeletedDirsForStore ignores exceptional return value of 
Queue<RecursiveTask<Boolean>>.offer.
   ```suggestion
               if (!recursiveTasks.offer(task)) {
                 // If the task cannot be enqueued, ensure it is joined and
                 // mark that not all deleted directories were processed.
                 task.join();
                 processedAllDeletedDirs = false;
                 break;
               }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to