Copilot commented on code in PR #9686:
URL: https://github.com/apache/ozone/pull/9686#discussion_r2739098363
##########
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java:
##########
@@ -138,84 +142,183 @@ protected void execTaskCompletion() { }
* Run one or more background tasks concurrently.
* Wait until all tasks to return the result.
*/
- public class PeriodicalTask implements Runnable {
- @Override
- public void run() {
- // wait for previous set of tasks to complete
- try {
- future.join();
- } catch (RuntimeException e) {
- LOG.error("Background service execution failed.", e);
- } finally {
- execTaskCompletion();
- }
+ public class PeriodicalTask extends RecursiveAction {
+ private final Queue<BackgroundTaskForkJoin> tasksInFlight;
+ private final AtomicReference<Boolean> isShutdown;
+ private final ScheduledExecutorService scheduledExecuterService;
+
+ public PeriodicalTask(ScheduledExecutorService scheduledExecutorService) {
+ this.tasksInFlight = new LinkedList<>();
+ this.isShutdown = BackgroundService.this.isShutdown;
+ this.scheduledExecuterService = scheduledExecutorService;
+ }
+
+ private PeriodicalTask(PeriodicalTask other) {
+ this.tasksInFlight = other.tasksInFlight;
+ this.isShutdown = other.isShutdown;
+ this.scheduledExecuterService = other.scheduledExecuterService;
+ }
+
+ private boolean performIfNotShutdown(Runnable runnable) {
+ return isShutdown.updateAndGet((shutdown) -> {
+ if (!shutdown) {
+ runnable.run();
+ }
+ return shutdown;
+ });
+ }
+ private <T> boolean performIfNotShutdown(Consumer<T> consumer, T t) {
+ return isShutdown.updateAndGet((shutdown) -> {
+ if (!shutdown) {
+ consumer.accept(t);
+ }
+ return shutdown;
+ });
+ }
+
+ private boolean runTasks() {
if (LOG.isDebugEnabled()) {
LOG.debug("Running background service : {}", serviceName);
}
- BackgroundTaskQueue tasks = getTasks();
+ if (isShutdown.get()) {
+ return false;
+ }
+ if (!tasksInFlight.isEmpty()) {
+ LOG.warn("Tasks are still in flight service {}. This should not happen
schedule should only begin once all " +
+ "tasks from schedules have completed execution.", serviceName);
+ tasksInFlight.clear();
+ }
+
+ BackgroundTaskQueue tasks = getTasks(true);
if (tasks.isEmpty()) {
// No task found, or some problems to init tasks
// return and retry in next interval.
- return;
+ return false;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Number of background tasks to execute : {}", tasks.size());
}
- synchronized (BackgroundService.this) {
- while (!tasks.isEmpty()) {
- BackgroundTask task = tasks.poll();
- future = future.thenCombine(CompletableFuture.runAsync(() -> {
- long startTime = System.nanoTime();
- try {
- BackgroundTaskResult result = task.call();
- if (LOG.isDebugEnabled()) {
- LOG.debug("task execution result size {}", result.getSize());
- }
- } catch (Throwable e) {
- LOG.error("Background task execution failed", e);
- if (e instanceof Error) {
- throw (Error) e;
- }
- } finally {
- long endTime = System.nanoTime();
- if (endTime - startTime > serviceTimeoutInNanos) {
- LOG.warn("{} Background task execution took {}ns >
{}ns(timeout)",
- serviceName, endTime - startTime, serviceTimeoutInNanos);
- }
- }
- }, exec).exceptionally(e -> null), (Void1, Void) -> null);
+ Consumer<BackgroundTaskForkJoin> taskForkHandler = task -> {
+ task.fork();
+ tasksInFlight.offer(task);
+ };
+ while (!tasks.isEmpty()) {
+ BackgroundTask task = tasks.poll();
+ // Wrap the task in a ForkJoin wrapper and fork it.
+ BackgroundTaskForkJoin forkJoinTask = new BackgroundTaskForkJoin(task);
+ if (performIfNotShutdown(taskForkHandler, forkJoinTask)) {
+ return false;
+ }
+ }
+ Consumer<BackgroundTaskForkJoin> taskCompletionHandler = task -> {
+ BackgroundTaskForkJoin.BackgroundTaskForkResult result = task.join();
+ // Check for exception first in the task execution.
+ if (result.getThrowable() != null) {
+ LOG.error("Background task execution failed", result.getThrowable());
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("task execution result size {}",
result.getResult().getSize());
+ }
+ }
+ if (result.getTotalExecutionTime() > serviceTimeoutInNanos) {
+ LOG.warn("{} Background task execution took {}ns > {}ns(timeout)",
+ serviceName, result.getTotalExecutionTime(),
serviceTimeoutInNanos);
+ }
+ };
+ while (!tasksInFlight.isEmpty()) {
+ BackgroundTaskForkJoin taskInFlight = tasksInFlight.poll();
+ // Join the tasks forked before and wait for the result one by one.
+ if (performIfNotShutdown(taskCompletionHandler, taskInFlight)) {
+ return false;
}
}
+ return true;
+ }
+
+ private void scheduleNextTask() {
+ performIfNotShutdown(() -> {
+ if (scheduledExecuterService != null) {
+ scheduledExecuterService.schedule(() -> exec.submit(new
PeriodicalTask(this)),
+ intervalInMillis, TimeUnit.MILLISECONDS);
Review Comment:
The scheduled runnable in `scheduleNextTask()` calls `exec.submit(...)`
without re-checking shutdown state or whether `exec` is still non-null. Since
`shutdown()` sets `exec = null` and the shared scheduler uses `shutdown()` (not
`shutdownNow()`), already-scheduled tasks can still execute after shutdown and
hit NPE / `RejectedExecutionException`. Guard inside the scheduled runnable
(check shutdown flag + `exec != null`) and/or cancel scheduled tasks on
shutdown.
```suggestion
scheduledExecuterService.schedule(() -> {
if (!isShutdown.get() && exec != null) {
exec.submit(new PeriodicalTask(this));
}
}, intervalInMillis, TimeUnit.MILLISECONDS);
```
##########
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundTaskForkJoin.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.utils;
+
+import java.util.concurrent.RecursiveTask;
+
+/**
+ * A ForkJoin wrapper for {@link BackgroundTask} that enables parallel
execution
+ * in a ForkJoinPool while keeping the BackgroundTask interface simple.
+ *
+ * <p>This wrapper handles the RecursiveTask mechanics, timing, and exception
+ * handling, allowing BackgroundTask implementations to focus on their
business logic.
+ */
+public class BackgroundTaskForkJoin extends
RecursiveTask<BackgroundTaskForkJoin.BackgroundTaskForkResult> {
+ private static final long serialVersionUID = 1L;
+ private final transient BackgroundTask backgroundTask;
+
+ public BackgroundTaskForkJoin(BackgroundTask backgroundTask) {
Review Comment:
This class is added under the `hdds-common` module but depends on
`BackgroundTask`/`BackgroundTaskResult`, which are defined in
`hdds-server-framework` (and `hdds-common` does not depend on it). Since
`hdds-server-framework` already depends on `hdds-common`, adding the reverse
dependency would create a cycle; as-is, this likely won’t compile. Consider
moving this class into `hdds-server-framework`, or into a new shared module
that both can depend on.
##########
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java:
##########
@@ -138,84 +142,183 @@ protected void execTaskCompletion() { }
* Run one or more background tasks concurrently.
* Wait until all tasks to return the result.
*/
- public class PeriodicalTask implements Runnable {
- @Override
- public void run() {
- // wait for previous set of tasks to complete
- try {
- future.join();
- } catch (RuntimeException e) {
- LOG.error("Background service execution failed.", e);
- } finally {
- execTaskCompletion();
- }
+ public class PeriodicalTask extends RecursiveAction {
+ private final Queue<BackgroundTaskForkJoin> tasksInFlight;
+ private final AtomicReference<Boolean> isShutdown;
+ private final ScheduledExecutorService scheduledExecuterService;
+
+ public PeriodicalTask(ScheduledExecutorService scheduledExecutorService) {
+ this.tasksInFlight = new LinkedList<>();
+ this.isShutdown = BackgroundService.this.isShutdown;
+ this.scheduledExecuterService = scheduledExecutorService;
+ }
+
+ private PeriodicalTask(PeriodicalTask other) {
+ this.tasksInFlight = other.tasksInFlight;
+ this.isShutdown = other.isShutdown;
+ this.scheduledExecuterService = other.scheduledExecuterService;
+ }
+
+ private boolean performIfNotShutdown(Runnable runnable) {
+ return isShutdown.updateAndGet((shutdown) -> {
+ if (!shutdown) {
+ runnable.run();
+ }
+ return shutdown;
+ });
+ }
+ private <T> boolean performIfNotShutdown(Consumer<T> consumer, T t) {
+ return isShutdown.updateAndGet((shutdown) -> {
+ if (!shutdown) {
+ consumer.accept(t);
+ }
+ return shutdown;
+ });
+ }
+
+ private boolean runTasks() {
if (LOG.isDebugEnabled()) {
LOG.debug("Running background service : {}", serviceName);
}
- BackgroundTaskQueue tasks = getTasks();
+ if (isShutdown.get()) {
+ return false;
+ }
+ if (!tasksInFlight.isEmpty()) {
+ LOG.warn("Tasks are still in flight service {}. This should not happen
schedule should only begin once all " +
+ "tasks from schedules have completed execution.", serviceName);
+ tasksInFlight.clear();
+ }
+
+ BackgroundTaskQueue tasks = getTasks(true);
if (tasks.isEmpty()) {
// No task found, or some problems to init tasks
// return and retry in next interval.
- return;
+ return false;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Number of background tasks to execute : {}", tasks.size());
}
- synchronized (BackgroundService.this) {
- while (!tasks.isEmpty()) {
- BackgroundTask task = tasks.poll();
- future = future.thenCombine(CompletableFuture.runAsync(() -> {
- long startTime = System.nanoTime();
- try {
- BackgroundTaskResult result = task.call();
- if (LOG.isDebugEnabled()) {
- LOG.debug("task execution result size {}", result.getSize());
- }
- } catch (Throwable e) {
- LOG.error("Background task execution failed", e);
- if (e instanceof Error) {
- throw (Error) e;
- }
- } finally {
- long endTime = System.nanoTime();
- if (endTime - startTime > serviceTimeoutInNanos) {
- LOG.warn("{} Background task execution took {}ns >
{}ns(timeout)",
- serviceName, endTime - startTime, serviceTimeoutInNanos);
- }
- }
- }, exec).exceptionally(e -> null), (Void1, Void) -> null);
+ Consumer<BackgroundTaskForkJoin> taskForkHandler = task -> {
+ task.fork();
+ tasksInFlight.offer(task);
Review Comment:
Method accept ignores exceptional return value of
Queue<BackgroundTaskForkJoin>.offer.
```suggestion
if (!tasksInFlight.offer(task)) {
LOG.error("Failed to enqueue background task for service {}. Task
will not be tracked.", serviceName);
}
```
##########
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundTaskForkJoin.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.utils;
+
+import java.util.concurrent.RecursiveTask;
+
+/**
+ * A ForkJoin wrapper for {@link BackgroundTask} that enables parallel
execution
+ * in a ForkJoinPool while keeping the BackgroundTask interface simple.
+ *
+ * <p>This wrapper handles the RecursiveTask mechanics, timing, and exception
+ * handling, allowing BackgroundTask implementations to focus on their
business logic.
+ */
+public class BackgroundTaskForkJoin extends
RecursiveTask<BackgroundTaskForkJoin.BackgroundTaskForkResult> {
+ private static final long serialVersionUID = 1L;
+ private final transient BackgroundTask backgroundTask;
+
+ public BackgroundTaskForkJoin(BackgroundTask backgroundTask) {
+ this.backgroundTask = backgroundTask;
+ }
+
+ /**
+ * Result wrapper containing the task result, execution time, and any
exception.
+ */
+ public static final class BackgroundTaskForkResult {
+ private final BackgroundTaskResult result;
+ private final Throwable throwable;
+ private final long startTime;
+ private final long endTime;
+
+ private BackgroundTaskForkResult(BackgroundTaskResult result, long
startTime, long endTime, Throwable throwable) {
+ this.endTime = endTime;
+ this.result = result;
+ this.startTime = startTime;
+ this.throwable = throwable;
+ }
+
+ public long getTotalExecutionTime() {
+ return endTime - startTime;
+ }
+
+ public BackgroundTaskResult getResult() {
+ return result;
+ }
+
+ public Throwable getThrowable() {
+ return throwable;
+ }
+ }
+
+ @Override
+ protected BackgroundTaskForkResult compute() {
+ long startTime = System.nanoTime();
+ BackgroundTaskResult result = null;
+ Throwable throwable = null;
+ try {
+ result = backgroundTask.call();
+ } catch (Throwable e) {
+ throwable = e;
Review Comment:
`BackgroundTaskForkJoin.compute()` catches all `Throwable` and only stores
it in the result, which means `Error`s (eg `OutOfMemoryError`) will be
swallowed and the pool will keep running. The previous BackgroundService
implementation explicitly rethrew `Error`. Consider rethrowing `Error` after
capturing/logging (or letting it propagate) so fatal JVM conditions aren’t
masked.
```suggestion
throwable = e;
if (e instanceof Error) {
throw (Error) e;
}
```
##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SstFilteringService.java:
##########
@@ -255,6 +255,9 @@ public BackgroundTaskResult call() throws Exception {
.getSnapshotId());
}
}
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOG.error("SST filtering task interrupted for snapshot: {}",
snapShotTableKey, e);
Review Comment:
After catching `InterruptedException`, the code re-interrupts the thread but
continues processing subsequent snapshots. Typically interruption should stop
the task (e.g., break the loop / return) to allow the service to shut down
promptly and avoid doing more work on an interrupted thread.
```suggestion
LOG.error("SST filtering task interrupted for snapshot: {}",
snapShotTableKey, e);
break;
```
##########
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java:
##########
@@ -138,84 +142,183 @@ protected void execTaskCompletion() { }
* Run one or more background tasks concurrently.
* Wait until all tasks to return the result.
*/
- public class PeriodicalTask implements Runnable {
- @Override
- public void run() {
- // wait for previous set of tasks to complete
- try {
- future.join();
- } catch (RuntimeException e) {
- LOG.error("Background service execution failed.", e);
- } finally {
- execTaskCompletion();
- }
+ public class PeriodicalTask extends RecursiveAction {
+ private final Queue<BackgroundTaskForkJoin> tasksInFlight;
+ private final AtomicReference<Boolean> isShutdown;
+ private final ScheduledExecutorService scheduledExecuterService;
+
+ public PeriodicalTask(ScheduledExecutorService scheduledExecutorService) {
+ this.tasksInFlight = new LinkedList<>();
+ this.isShutdown = BackgroundService.this.isShutdown;
+ this.scheduledExecuterService = scheduledExecutorService;
+ }
+
+ private PeriodicalTask(PeriodicalTask other) {
+ this.tasksInFlight = other.tasksInFlight;
+ this.isShutdown = other.isShutdown;
+ this.scheduledExecuterService = other.scheduledExecuterService;
+ }
+
+ private boolean performIfNotShutdown(Runnable runnable) {
+ return isShutdown.updateAndGet((shutdown) -> {
+ if (!shutdown) {
+ runnable.run();
+ }
+ return shutdown;
+ });
+ }
+ private <T> boolean performIfNotShutdown(Consumer<T> consumer, T t) {
+ return isShutdown.updateAndGet((shutdown) -> {
+ if (!shutdown) {
+ consumer.accept(t);
+ }
+ return shutdown;
+ });
+ }
+
+ private boolean runTasks() {
if (LOG.isDebugEnabled()) {
LOG.debug("Running background service : {}", serviceName);
}
- BackgroundTaskQueue tasks = getTasks();
+ if (isShutdown.get()) {
+ return false;
+ }
+ if (!tasksInFlight.isEmpty()) {
+ LOG.warn("Tasks are still in flight service {}. This should not happen
schedule should only begin once all " +
+ "tasks from schedules have completed execution.", serviceName);
+ tasksInFlight.clear();
+ }
+
+ BackgroundTaskQueue tasks = getTasks(true);
if (tasks.isEmpty()) {
// No task found, or some problems to init tasks
// return and retry in next interval.
- return;
+ return false;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Number of background tasks to execute : {}", tasks.size());
}
- synchronized (BackgroundService.this) {
- while (!tasks.isEmpty()) {
- BackgroundTask task = tasks.poll();
- future = future.thenCombine(CompletableFuture.runAsync(() -> {
- long startTime = System.nanoTime();
- try {
- BackgroundTaskResult result = task.call();
- if (LOG.isDebugEnabled()) {
- LOG.debug("task execution result size {}", result.getSize());
- }
- } catch (Throwable e) {
- LOG.error("Background task execution failed", e);
- if (e instanceof Error) {
- throw (Error) e;
- }
- } finally {
- long endTime = System.nanoTime();
- if (endTime - startTime > serviceTimeoutInNanos) {
- LOG.warn("{} Background task execution took {}ns >
{}ns(timeout)",
- serviceName, endTime - startTime, serviceTimeoutInNanos);
- }
- }
- }, exec).exceptionally(e -> null), (Void1, Void) -> null);
+ Consumer<BackgroundTaskForkJoin> taskForkHandler = task -> {
+ task.fork();
+ tasksInFlight.offer(task);
+ };
+ while (!tasks.isEmpty()) {
+ BackgroundTask task = tasks.poll();
+ // Wrap the task in a ForkJoin wrapper and fork it.
+ BackgroundTaskForkJoin forkJoinTask = new BackgroundTaskForkJoin(task);
+ if (performIfNotShutdown(taskForkHandler, forkJoinTask)) {
+ return false;
+ }
+ }
+ Consumer<BackgroundTaskForkJoin> taskCompletionHandler = task -> {
+ BackgroundTaskForkJoin.BackgroundTaskForkResult result = task.join();
+ // Check for exception first in the task execution.
+ if (result.getThrowable() != null) {
+ LOG.error("Background task execution failed", result.getThrowable());
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("task execution result size {}",
result.getResult().getSize());
+ }
+ }
+ if (result.getTotalExecutionTime() > serviceTimeoutInNanos) {
+ LOG.warn("{} Background task execution took {}ns > {}ns(timeout)",
+ serviceName, result.getTotalExecutionTime(),
serviceTimeoutInNanos);
+ }
+ };
+ while (!tasksInFlight.isEmpty()) {
+ BackgroundTaskForkJoin taskInFlight = tasksInFlight.poll();
+ // Join the tasks forked before and wait for the result one by one.
+ if (performIfNotShutdown(taskCompletionHandler, taskInFlight)) {
+ return false;
}
}
+ return true;
+ }
+
+ private void scheduleNextTask() {
+ performIfNotShutdown(() -> {
+ if (scheduledExecuterService != null) {
+ scheduledExecuterService.schedule(() -> exec.submit(new
PeriodicalTask(this)),
+ intervalInMillis, TimeUnit.MILLISECONDS);
+ }
+ });
+ }
+
+ @Override
+ public void compute() {
+ future = new CompletableFuture<>();
+ if (runTasks()) {
+ scheduleNextTask();
+ } else {
+ LOG.debug("Service {} is shutdown. Cancelling all schedules of all
tasks.", serviceName);
+ }
+ future.complete(null);
}
}
// shutdown and make sure all threads are properly released.
- public synchronized void shutdown() {
+ public void shutdown() {
LOG.info("Shutting down service {}", this.serviceName);
- exec.shutdown();
- try {
- if (!exec.awaitTermination(60, TimeUnit.SECONDS)) {
- exec.shutdownNow();
+ final ThreadGroup threadGroupToBeClosed;
+ final ForkJoinPool execToShutdown;
+ final UncheckedAutoCloseableSupplier<ScheduledExecutorService>
periodicTaskSchedulerToBeClosed;
+ // Set the shutdown flag to true to prevent new tasks from being submitted.
+ synchronized (this) {
+ periodicTaskSchedulerToBeClosed = periodicTaskScheduler;
+ threadGroupToBeClosed = threadGroup;
+ execToShutdown = exec;
+ exec = null;
+ threadGroup = null;
+ periodicTaskScheduler = null;
+ if (isShutdown != null) {
+ this.isShutdown.set(true);
+ }
+ isShutdown = null;
+ }
+ if (execToShutdown != null) {
+ execToShutdown.shutdown();
+ try {
+ if (!execToShutdown.awaitTermination(60, TimeUnit.SECONDS)) {
+ execToShutdown.shutdownNow();
+ }
+ } catch (InterruptedException e) {
+ // Re-interrupt the thread while catching InterruptedException
+ Thread.currentThread().interrupt();
+ execToShutdown.shutdownNow();
}
- } catch (InterruptedException e) {
- // Re-interrupt the thread while catching InterruptedException
- Thread.currentThread().interrupt();
- exec.shutdownNow();
}
- if (threadGroup.activeCount() == 0 && !threadGroup.isDestroyed()) {
- threadGroup.destroy();
+ if (periodicTaskSchedulerToBeClosed != null) {
+ periodicTaskSchedulerToBeClosed.close();
+ }
+ if (threadGroupToBeClosed != null && !threadGroupToBeClosed.isDestroyed())
{
+ threadGroupToBeClosed.destroy();
}
}
- private void initExecutorAndThreadGroup() {
- threadGroup = new ThreadGroup(serviceName);
- ThreadFactory threadFactory = new ThreadFactoryBuilder()
- .setThreadFactory(r -> new Thread(threadGroup, r))
- .setDaemon(true)
- .setNameFormat(threadNamePrefix + serviceName + "#%d")
- .build();
- exec = (ScheduledThreadPoolExecutor)
Executors.newScheduledThreadPool(threadPoolSize, threadFactory);
+ private synchronized void initExecutorAndThreadGroup() {
+ try {
+ threadGroup = new ThreadGroup(serviceName);
+ Thread initThread = new Thread(threadGroup, () -> {
+ ForkJoinPool.ForkJoinWorkerThreadFactory factory =
+ pool -> {
+ ForkJoinWorkerThread thread = new ForkJoinWorkerThread(pool) {
+ };
+ thread.setDaemon(true);
+ thread.setName(threadNamePrefix + serviceName +
thread.getPoolIndex());
+ return thread;
+ };
+ exec = new ForkJoinPool(threadPoolSize, factory, null, false);
+ isShutdown = new AtomicReference<>(false);
Review Comment:
Background tasks frequently do blocking I/O (eg OM Ratis `submitRequest`,
RocksDB calls). Using a `ForkJoinPool` for blocking work can reduce parallelism
and hurt throughput unless blocking sections use
`ForkJoinPool.managedBlock(...)` (or a dedicated blocking pool is used).
Consider addressing blocking sections or documenting why ForkJoinPool is safe
here.
##########
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java:
##########
@@ -138,84 +142,183 @@ protected void execTaskCompletion() { }
* Run one or more background tasks concurrently.
* Wait until all tasks to return the result.
*/
- public class PeriodicalTask implements Runnable {
- @Override
- public void run() {
- // wait for previous set of tasks to complete
- try {
- future.join();
- } catch (RuntimeException e) {
- LOG.error("Background service execution failed.", e);
- } finally {
- execTaskCompletion();
- }
+ public class PeriodicalTask extends RecursiveAction {
+ private final Queue<BackgroundTaskForkJoin> tasksInFlight;
+ private final AtomicReference<Boolean> isShutdown;
+ private final ScheduledExecutorService scheduledExecuterService;
+
+ public PeriodicalTask(ScheduledExecutorService scheduledExecutorService) {
+ this.tasksInFlight = new LinkedList<>();
+ this.isShutdown = BackgroundService.this.isShutdown;
+ this.scheduledExecuterService = scheduledExecutorService;
+ }
Review Comment:
`PeriodicalTask` no longer has a no-arg constructor. There are existing call
sites that still use `new PeriodicalTask()` (e.g.,
`hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerServiceTestImpl.java:68`),
which will fail compilation. Either restore a no-arg constructor delegating to
`this(null)` or update all call sites.
##########
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundServiceScheduler.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.utils;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import org.apache.ratis.util.ReferenceCountedObject;
+import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier;
+
+/**
+ * Utility class to manage a shared background service using a {@link
ScheduledExecutorService}
+ * which is provided with a single-threaded {@link
ScheduledThreadPoolExecutor}.
+ * This class manages the lifecycle and reference counting for the executor
+ * to ensure proper resource cleanup.
+ *
+ * The executor is lazily initialized on the first invocation of the {@code
get()} method.
+ * It is shut down and released when no longer referenced, ensuring efficient
use
+ * of system resources. The shutdown process includes cleaning the reference
to the executor.
+ *
+ * This class is thread-safe.
+ */
+final class BackgroundServiceScheduler {
+ private static ReferenceCountedObject<ScheduledExecutorService> executor;
+
+ private BackgroundServiceScheduler() {
+
+ }
+
+ public static synchronized
UncheckedAutoCloseableSupplier<ScheduledExecutorService> get() {
+ if (executor == null) {
+ ScheduledThreadPoolExecutor scheduler = new
ScheduledThreadPoolExecutor(1);
+ executor = ReferenceCountedObject.wrap(scheduler, () -> { }, (shutdown)
-> {
Review Comment:
`BackgroundServiceScheduler` uses the default thread factory for
`ScheduledThreadPoolExecutor`, which creates non-daemon threads. Since this is
a shared background scheduler, a non-daemon thread can keep the JVM alive if
something forgets to close/release it. Consider using a daemon thread factory
(and naming the thread) for the scheduler.
##########
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java:
##########
@@ -138,84 +142,183 @@ protected void execTaskCompletion() { }
* Run one or more background tasks concurrently.
* Wait until all tasks to return the result.
*/
- public class PeriodicalTask implements Runnable {
- @Override
- public void run() {
- // wait for previous set of tasks to complete
- try {
- future.join();
- } catch (RuntimeException e) {
- LOG.error("Background service execution failed.", e);
- } finally {
- execTaskCompletion();
- }
+ public class PeriodicalTask extends RecursiveAction {
+ private final Queue<BackgroundTaskForkJoin> tasksInFlight;
+ private final AtomicReference<Boolean> isShutdown;
+ private final ScheduledExecutorService scheduledExecuterService;
+
+ public PeriodicalTask(ScheduledExecutorService scheduledExecutorService) {
+ this.tasksInFlight = new LinkedList<>();
+ this.isShutdown = BackgroundService.this.isShutdown;
+ this.scheduledExecuterService = scheduledExecutorService;
+ }
+
+ private PeriodicalTask(PeriodicalTask other) {
+ this.tasksInFlight = other.tasksInFlight;
+ this.isShutdown = other.isShutdown;
+ this.scheduledExecuterService = other.scheduledExecuterService;
+ }
+
+ private boolean performIfNotShutdown(Runnable runnable) {
+ return isShutdown.updateAndGet((shutdown) -> {
+ if (!shutdown) {
+ runnable.run();
+ }
+ return shutdown;
+ });
+ }
+ private <T> boolean performIfNotShutdown(Consumer<T> consumer, T t) {
+ return isShutdown.updateAndGet((shutdown) -> {
+ if (!shutdown) {
+ consumer.accept(t);
+ }
+ return shutdown;
+ });
+ }
+
+ private boolean runTasks() {
if (LOG.isDebugEnabled()) {
LOG.debug("Running background service : {}", serviceName);
}
- BackgroundTaskQueue tasks = getTasks();
+ if (isShutdown.get()) {
+ return false;
+ }
+ if (!tasksInFlight.isEmpty()) {
+ LOG.warn("Tasks are still in flight service {}. This should not happen
schedule should only begin once all " +
+ "tasks from schedules have completed execution.", serviceName);
+ tasksInFlight.clear();
+ }
+
+ BackgroundTaskQueue tasks = getTasks(true);
if (tasks.isEmpty()) {
// No task found, or some problems to init tasks
// return and retry in next interval.
- return;
+ return false;
Review Comment:
`runTasks()` returns `false` when `getTasks(true)` is empty, which stops
future scheduling even though the comment says it should “retry in next
interval”. This changes the `BackgroundService` contract and can permanently
halt services on transient task-creation failures. Consider scheduling the next
run even when the queue is empty (and adjust logging accordingly).
##########
hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingService.java:
##########
@@ -226,8 +227,8 @@ public void testMultithreadedDirectoryDeletion() throws
Exception {
return future;
});
ozoneManager.getKeyManager().getDirDeletingService().suspend();
- DirectoryDeletingService.DirDeletingTask dirDeletingTask =
- ozoneManager.getKeyManager().getDirDeletingService().new
DirDeletingTask(null);
+ DirDeletingTask dirDeletingTask = new DirDeletingTask(null, false,
+ ozoneManager.getKeyManager().getDirDeletingService());
dirDeletingTask.processDeletedDirsForStore(null,
ozoneManager.getKeyManager(), 1, 6000);
Review Comment:
`testMultithreadedDirectoryDeletion` still mocks
`CompletableFuture.supplyAsync` and asserts `futureList` has `threadCount`
entries, but `DirDeletingTask.processDeletedDirsForStore(...)` no longer uses
`CompletableFuture` (and is invoked with `allowForks=false`), so `futureList`
will stay empty and the assertion will fail. Update this test to validate the
new ForkJoin-based parallelism (or adjust it to the new execution path).
##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java:
##########
@@ -738,38 +757,38 @@ startTime, getOzoneManager().getKeyManager(),
public BackgroundTaskResult call() {
// Check if this is the Leader OM. If not leader, no need to execute this
// task.
- if (shouldRun()) {
- final long run = getRunCount().incrementAndGet();
+ if (dds.shouldRun()) {
+ final long run = dds.getRunCount().incrementAndGet();
if (snapshotId == null) {
LOG.debug("Running DirectoryDeletingService for active object store,
{}", run);
} else {
LOG.debug("Running DirectoryDeletingService for snapshot : {}, {}",
snapshotId, run);
}
- OmSnapshotManager omSnapshotManager =
getOzoneManager().getOmSnapshotManager();
+ OmSnapshotManager omSnapshotManager =
dds.getOzoneManager().getOmSnapshotManager();
SnapshotInfo snapInfo = null;
try {
snapInfo = snapshotId == null ? null :
- SnapshotUtils.getSnapshotInfo(getOzoneManager(),
snapshotChainManager, snapshotId);
+ SnapshotUtils.getSnapshotInfo(dds.getOzoneManager(),
dds.snapshotChainManager, snapshotId);
if (snapInfo != null) {
if (snapInfo.isDeepCleanedDeletedDir()) {
LOG.info("Snapshot {} has already been deep cleaned directory.
Skipping the snapshot in this iteration.",
snapInfo.getSnapshotId());
return BackgroundTaskResult.EmptyTaskResult.newResult();
}
- if
(!OmSnapshotManager.areSnapshotChangesFlushedToDB(getOzoneManager().getMetadataManager(),
snapInfo)) {
+ if
(!areSnapshotChangesFlushedToDB(dds.getOzoneManager().getMetadataManager(),
snapInfo)) {
LOG.info("Skipping snapshot processing since changes to snapshot
{} have not been flushed to disk",
snapInfo);
return BackgroundTaskResult.EmptyTaskResult.newResult();
}
- } else if (!isPreviousPurgeTransactionFlushed()) {
+ } else if (!dds.isPreviousPurgeTransactionFlushed()) {
return BackgroundTaskResult.EmptyTaskResult.newResult();
}
try (UncheckedAutoCloseableSupplier<OmSnapshot> omSnapshot =
snapInfo == null ? null :
omSnapshotManager.getActiveSnapshot(snapInfo.getVolumeName(),
snapInfo.getBucketName(),
snapInfo.getName())) {
- KeyManager keyManager = snapInfo == null ?
getOzoneManager().getKeyManager()
+ KeyManager keyManager = snapInfo == null ?
dds.getOzoneManager().getKeyManager()
Review Comment:
Access of [element](1) annotated with VisibleForTesting found in production
code.
##########
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java:
##########
@@ -138,84 +142,183 @@ protected void execTaskCompletion() { }
* Run one or more background tasks concurrently.
* Wait until all tasks to return the result.
*/
- public class PeriodicalTask implements Runnable {
- @Override
- public void run() {
- // wait for previous set of tasks to complete
- try {
- future.join();
- } catch (RuntimeException e) {
- LOG.error("Background service execution failed.", e);
- } finally {
- execTaskCompletion();
- }
+ public class PeriodicalTask extends RecursiveAction {
+ private final Queue<BackgroundTaskForkJoin> tasksInFlight;
+ private final AtomicReference<Boolean> isShutdown;
+ private final ScheduledExecutorService scheduledExecuterService;
+
+ public PeriodicalTask(ScheduledExecutorService scheduledExecutorService) {
+ this.tasksInFlight = new LinkedList<>();
+ this.isShutdown = BackgroundService.this.isShutdown;
+ this.scheduledExecuterService = scheduledExecutorService;
+ }
+
+ private PeriodicalTask(PeriodicalTask other) {
+ this.tasksInFlight = other.tasksInFlight;
+ this.isShutdown = other.isShutdown;
+ this.scheduledExecuterService = other.scheduledExecuterService;
Review Comment:
Field name `scheduledExecuterService` is misspelled ("Executer" vs
"Executor"). Renaming to `scheduledExecutorService` would improve readability
and avoid propagating the typo to other code.
```suggestion
private final ScheduledExecutorService scheduledExecutorService;
public PeriodicalTask(ScheduledExecutorService scheduledExecutorService)
{
this.tasksInFlight = new LinkedList<>();
this.isShutdown = BackgroundService.this.isShutdown;
this.scheduledExecutorService = scheduledExecutorService;
}
private PeriodicalTask(PeriodicalTask other) {
this.tasksInFlight = other.tasksInFlight;
this.isShutdown = other.isShutdown;
this.scheduledExecutorService = other.scheduledExecutorService;
```
##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java:
##########
@@ -705,9 +724,9 @@ private boolean processDeletedDirectories(SnapshotInfo
currentSnapshotInfo, KeyM
subFileNum += purgePathRequest.getDeletedSubFilesCount();
}
- optimizeDirDeletesAndSubmitRequest(dirNum, subDirNum,
+ dds.optimizeDirDeletesAndSubmitRequest(dirNum, subDirNum,
subFileNum, allSubDirList, purgePathRequestList, snapshotTableKey,
- startTime, getOzoneManager().getKeyManager(),
+ startTime, dds.getOzoneManager().getKeyManager(),
Review Comment:
Access of [element](1) annotated with VisibleForTesting found in production
code.
```suggestion
startTime, keyManager,
```
##########
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java:
##########
@@ -138,84 +142,183 @@ protected void execTaskCompletion() { }
* Run one or more background tasks concurrently.
* Wait until all tasks to return the result.
*/
- public class PeriodicalTask implements Runnable {
- @Override
- public void run() {
- // wait for previous set of tasks to complete
- try {
- future.join();
- } catch (RuntimeException e) {
- LOG.error("Background service execution failed.", e);
- } finally {
- execTaskCompletion();
- }
+ public class PeriodicalTask extends RecursiveAction {
+ private final Queue<BackgroundTaskForkJoin> tasksInFlight;
+ private final AtomicReference<Boolean> isShutdown;
+ private final ScheduledExecutorService scheduledExecuterService;
+
+ public PeriodicalTask(ScheduledExecutorService scheduledExecutorService) {
+ this.tasksInFlight = new LinkedList<>();
+ this.isShutdown = BackgroundService.this.isShutdown;
+ this.scheduledExecuterService = scheduledExecutorService;
+ }
+
+ private PeriodicalTask(PeriodicalTask other) {
+ this.tasksInFlight = other.tasksInFlight;
+ this.isShutdown = other.isShutdown;
+ this.scheduledExecuterService = other.scheduledExecuterService;
+ }
+
+ private boolean performIfNotShutdown(Runnable runnable) {
+ return isShutdown.updateAndGet((shutdown) -> {
+ if (!shutdown) {
+ runnable.run();
+ }
+ return shutdown;
+ });
+ }
+ private <T> boolean performIfNotShutdown(Consumer<T> consumer, T t) {
+ return isShutdown.updateAndGet((shutdown) -> {
+ if (!shutdown) {
+ consumer.accept(t);
+ }
+ return shutdown;
+ });
+ }
+
+ private boolean runTasks() {
if (LOG.isDebugEnabled()) {
LOG.debug("Running background service : {}", serviceName);
}
- BackgroundTaskQueue tasks = getTasks();
+ if (isShutdown.get()) {
+ return false;
+ }
+ if (!tasksInFlight.isEmpty()) {
+ LOG.warn("Tasks are still in flight service {}. This should not happen
schedule should only begin once all " +
+ "tasks from schedules have completed execution.", serviceName);
+ tasksInFlight.clear();
+ }
+
+ BackgroundTaskQueue tasks = getTasks(true);
if (tasks.isEmpty()) {
// No task found, or some problems to init tasks
// return and retry in next interval.
- return;
+ return false;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Number of background tasks to execute : {}", tasks.size());
}
- synchronized (BackgroundService.this) {
- while (!tasks.isEmpty()) {
- BackgroundTask task = tasks.poll();
- future = future.thenCombine(CompletableFuture.runAsync(() -> {
- long startTime = System.nanoTime();
- try {
- BackgroundTaskResult result = task.call();
- if (LOG.isDebugEnabled()) {
- LOG.debug("task execution result size {}", result.getSize());
- }
- } catch (Throwable e) {
- LOG.error("Background task execution failed", e);
- if (e instanceof Error) {
- throw (Error) e;
- }
- } finally {
- long endTime = System.nanoTime();
- if (endTime - startTime > serviceTimeoutInNanos) {
- LOG.warn("{} Background task execution took {}ns >
{}ns(timeout)",
- serviceName, endTime - startTime, serviceTimeoutInNanos);
- }
- }
- }, exec).exceptionally(e -> null), (Void1, Void) -> null);
+ Consumer<BackgroundTaskForkJoin> taskForkHandler = task -> {
+ task.fork();
+ tasksInFlight.offer(task);
+ };
+ while (!tasks.isEmpty()) {
+ BackgroundTask task = tasks.poll();
+ // Wrap the task in a ForkJoin wrapper and fork it.
+ BackgroundTaskForkJoin forkJoinTask = new BackgroundTaskForkJoin(task);
+ if (performIfNotShutdown(taskForkHandler, forkJoinTask)) {
+ return false;
+ }
+ }
+ Consumer<BackgroundTaskForkJoin> taskCompletionHandler = task -> {
+ BackgroundTaskForkJoin.BackgroundTaskForkResult result = task.join();
+ // Check for exception first in the task execution.
+ if (result.getThrowable() != null) {
+ LOG.error("Background task execution failed", result.getThrowable());
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("task execution result size {}",
result.getResult().getSize());
+ }
+ }
+ if (result.getTotalExecutionTime() > serviceTimeoutInNanos) {
+ LOG.warn("{} Background task execution took {}ns > {}ns(timeout)",
+ serviceName, result.getTotalExecutionTime(),
serviceTimeoutInNanos);
+ }
+ };
+ while (!tasksInFlight.isEmpty()) {
+ BackgroundTaskForkJoin taskInFlight = tasksInFlight.poll();
+ // Join the tasks forked before and wait for the result one by one.
+ if (performIfNotShutdown(taskCompletionHandler, taskInFlight)) {
+ return false;
}
}
+ return true;
+ }
+
+ private void scheduleNextTask() {
+ performIfNotShutdown(() -> {
+ if (scheduledExecuterService != null) {
+ scheduledExecuterService.schedule(() -> exec.submit(new
PeriodicalTask(this)),
+ intervalInMillis, TimeUnit.MILLISECONDS);
+ }
+ });
+ }
+
+ @Override
+ public void compute() {
+ future = new CompletableFuture<>();
+ if (runTasks()) {
+ scheduleNextTask();
+ } else {
+ LOG.debug("Service {} is shutdown. Cancelling all schedules of all
tasks.", serviceName);
+ }
+ future.complete(null);
}
}
// shutdown and make sure all threads are properly released.
- public synchronized void shutdown() {
+ public void shutdown() {
LOG.info("Shutting down service {}", this.serviceName);
- exec.shutdown();
- try {
- if (!exec.awaitTermination(60, TimeUnit.SECONDS)) {
- exec.shutdownNow();
+ final ThreadGroup threadGroupToBeClosed;
+ final ForkJoinPool execToShutdown;
+ final UncheckedAutoCloseableSupplier<ScheduledExecutorService>
periodicTaskSchedulerToBeClosed;
+ // Set the shutdown flag to true to prevent new tasks from being submitted.
+ synchronized (this) {
+ periodicTaskSchedulerToBeClosed = periodicTaskScheduler;
+ threadGroupToBeClosed = threadGroup;
+ execToShutdown = exec;
+ exec = null;
+ threadGroup = null;
+ periodicTaskScheduler = null;
+ if (isShutdown != null) {
+ this.isShutdown.set(true);
+ }
+ isShutdown = null;
+ }
+ if (execToShutdown != null) {
+ execToShutdown.shutdown();
+ try {
+ if (!execToShutdown.awaitTermination(60, TimeUnit.SECONDS)) {
+ execToShutdown.shutdownNow();
+ }
+ } catch (InterruptedException e) {
+ // Re-interrupt the thread while catching InterruptedException
+ Thread.currentThread().interrupt();
+ execToShutdown.shutdownNow();
}
- } catch (InterruptedException e) {
- // Re-interrupt the thread while catching InterruptedException
- Thread.currentThread().interrupt();
- exec.shutdownNow();
}
- if (threadGroup.activeCount() == 0 && !threadGroup.isDestroyed()) {
- threadGroup.destroy();
+ if (periodicTaskSchedulerToBeClosed != null) {
+ periodicTaskSchedulerToBeClosed.close();
+ }
+ if (threadGroupToBeClosed != null && !threadGroupToBeClosed.isDestroyed())
{
+ threadGroupToBeClosed.destroy();
Review Comment:
`shutdown()` calls `threadGroupToBeClosed.destroy()` without ensuring the
group has no active threads. `ThreadGroup.destroy()` throws
`IllegalThreadStateException` if the group still has active threads, which can
make shutdown fail unexpectedly. Consider checking `activeCount()==0` (as
before), or catching/handling the exception and/or waiting longer for worker
termination before destroying the group.
```suggestion
try {
int activeCount = threadGroupToBeClosed.activeCount();
if (activeCount == 0) {
threadGroupToBeClosed.destroy();
} else {
LOG.warn("Skipping destroy of thread group {} as it still has {}
active threads",
threadGroupToBeClosed.getName(), activeCount);
}
} catch (IllegalThreadStateException e) {
LOG.warn("Failed to destroy thread group {} due to active threads",
threadGroupToBeClosed.getName(), e);
}
```
##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java:
##########
@@ -688,10 +707,10 @@ private boolean processDeletedDirectories(SnapshotInfo
currentSnapshotInfo, KeyM
.build());
boolean isDirReclaimable =
reclaimableDirFilter.apply(pendingDeletedDirInfo);
- Optional<PurgePathRequest> request = prepareDeleteDirRequest(
+ Optional<PurgePathRequest> request = dds.prepareDeleteDirRequest(
pendingDeletedDirInfo.getValue(),
pendingDeletedDirInfo.getKey(), isDirReclaimable, allSubDirList,
- getOzoneManager().getKeyManager(), reclaimableFileFilter,
remainNum);
+ dds.getOzoneManager().getKeyManager(), reclaimableFileFilter,
remainNum);
Review Comment:
Access of [element](1) annotated with VisibleForTesting found in production
code.
##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java:
##########
@@ -596,26 +587,54 @@ void processDeletedDirsForStore(SnapshotInfo
currentSnapshotInfo, KeyManager key
// This is to avoid race condition b/w purge request and snapshot
chain update. For AOS taking the global
// snapshotId since AOS could process multiple buckets in one
iteration. While using path
// previous snapshotId for a snapshot since it would process only one
bucket.
+ SnapshotChainManager snapshotChainManager = dds.snapshotChainManager;
UUID expectedPreviousSnapshotId = currentSnapshotInfo == null ?
snapshotChainManager.getLatestGlobalSnapshotId() :
SnapshotUtils.getPreviousSnapshotId(currentSnapshotInfo,
snapshotChainManager);
Map<UUID, Pair<Long, Long>> exclusiveSizeMap = Maps.newConcurrentMap();
- CompletableFuture<Boolean> processedAllDeletedDirs =
CompletableFuture.completedFuture(true);
- for (int i = 0; i < numberOfParallelThreadsPerStore; i++) {
- CompletableFuture<Boolean> future = CompletableFuture.supplyAsync(()
-> {
- try {
- return processDeletedDirectories(currentSnapshotInfo,
keyManager, dirSupplier,
- expectedPreviousSnapshotId, exclusiveSizeMap, rnCnt,
remainNum);
- } catch (Throwable e) {
- return false;
- }
- }, isThreadPoolActive(deletionThreadPool) ? deletionThreadPool :
ForkJoinPool.commonPool());
- processedAllDeletedDirs =
processedAllDeletedDirs.thenCombine(future, (a, b) -> a && b);
+ boolean processedAllDeletedDirs;
+ int maxForksPerStore = dds.maxForksPerStore;
+ // If allowed to fork, create multiple tasks to process deleted
directories tasks in parallel.
+ if (allowForks) {
+ Queue<RecursiveTask<Boolean>> recursiveTasks = new LinkedList<>();
+ processedAllDeletedDirs = true;
+ for (int i = 0; i < maxForksPerStore; i++) {
+ RecursiveTask<Boolean> task = new RecursiveTask<Boolean>() {
+ private static final long serialVersionUID = 1L;
+ private final transient SnapshotInfo snapshotInfo =
currentSnapshotInfo;
+ private final transient DeletedDirSupplier deletedDirSupplier =
dirSupplier;
+ private final transient KeyManager km = keyManager;
+
+ @Override
+ protected Boolean compute() {
+ try {
+ return processDeletedDirectories(snapshotInfo, km,
deletedDirSupplier,
+ expectedPreviousSnapshotId, exclusiveSizeMap, rnCnt,
remainNum);
+ } catch (Throwable e) {
+ return false;
+ }
+ }
+ };
+ task.fork();
+ recursiveTasks.offer(task);
Review Comment:
Method processDeletedDirsForStore ignores exceptional return value of
Queue<RecursiveTask<Boolean>>.offer.
```suggestion
if (!recursiveTasks.offer(task)) {
// If the task cannot be enqueued, ensure it is joined and
// mark that not all deleted directories were processed.
task.join();
processedAllDeletedDirs = false;
break;
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]