sumitagrawl commented on code in PR #9390:
URL: https://github.com/apache/ozone/pull/9390#discussion_r2575566273
##########
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundTask.java:
##########
@@ -18,16 +18,56 @@
package org.apache.hadoop.hdds.utils;
import java.util.concurrent.Callable;
+import java.util.concurrent.RecursiveTask;
/**
* A task thread to run by {@link BackgroundService}.
*/
-public interface BackgroundTask extends Callable<BackgroundTaskResult> {
+public abstract class BackgroundTask extends
RecursiveTask<BackgroundTask.BackgroundTaskForkResult>
Review Comment:
need not change BackgroundTask, but encapsulate in inside ForkJoinTask as
transient and can be scheduled. This avoid changing all other place due to
change in framework. Externally, its just Runnable task.
##########
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java:
##########
@@ -138,84 +138,148 @@ protected void execTaskCompletion() { }
* Run one or more background tasks concurrently.
* Wait until all tasks to return the result.
*/
- public class PeriodicalTask implements Runnable {
- @Override
- public void run() {
- // wait for previous set of tasks to complete
- try {
- future.join();
- } catch (RuntimeException e) {
- LOG.error("Background service execution failed.", e);
- } finally {
- execTaskCompletion();
- }
+ public class PeriodicalTask extends RecursiveAction {
+ private int numberOfLoops;
+ private final Queue<BackgroundTask> tasksInFlight;
+ private final AtomicBoolean isShutdown;
- if (LOG.isDebugEnabled()) {
- LOG.debug("Running background service : {}", serviceName);
+ public PeriodicalTask(int numberOfLoops) {
+ this.numberOfLoops = numberOfLoops;
+ this.tasksInFlight = new LinkedList<>();
+ this.isShutdown = BackgroundService.this.isShutdown;
+ }
+
+ private boolean waitForNextInterval() {
+
+ if (numberOfLoops > 0) {
+ numberOfLoops--;
+ if (numberOfLoops == 0) {
+ return false;
+ }
}
- BackgroundTaskQueue tasks = getTasks();
- if (tasks.isEmpty()) {
- // No task found, or some problems to init tasks
- // return and retry in next interval.
- return;
+ // Check if the executor has been shutdown during task execution.
+ if (!isShutdown.get()) {
+ synchronized (BackgroundService.this) {
+ // Get the shutdown flag again after acquiring the lock.
+ if (isShutdown.get()) {
+ return false;
+ }
+ try {
+ BackgroundService.this.wait(intervalInMillis);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOG.warn("Interrupted while waiting for next interval.", e);
+ return false;
+ }
+ }
}
+ return !isShutdown.get();
+ }
+
+ @Override
+ public void compute() {
if (LOG.isDebugEnabled()) {
- LOG.debug("Number of background tasks to execute : {}", tasks.size());
+ LOG.debug("Running background service : {}", serviceName);
}
- synchronized (BackgroundService.this) {
+ boolean runAgain = true;
+ do {
+ future = new CompletableFuture<>();
+ BackgroundTaskQueue tasks = getTasks(true);
+ if (tasks.isEmpty()) {
+ // No task found, or some problems to init tasks
+ // return and retry in next interval.
+ continue;
Review Comment:
this can cause continuous loop as directly jump to while condition without
waitForNextInterval()
##########
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java:
##########
@@ -138,84 +138,148 @@ protected void execTaskCompletion() { }
* Run one or more background tasks concurrently.
* Wait until all tasks to return the result.
*/
- public class PeriodicalTask implements Runnable {
- @Override
- public void run() {
- // wait for previous set of tasks to complete
- try {
- future.join();
- } catch (RuntimeException e) {
- LOG.error("Background service execution failed.", e);
- } finally {
- execTaskCompletion();
- }
+ public class PeriodicalTask extends RecursiveAction {
+ private int numberOfLoops;
+ private final Queue<BackgroundTask> tasksInFlight;
+ private final AtomicBoolean isShutdown;
- if (LOG.isDebugEnabled()) {
- LOG.debug("Running background service : {}", serviceName);
+ public PeriodicalTask(int numberOfLoops) {
+ this.numberOfLoops = numberOfLoops;
+ this.tasksInFlight = new LinkedList<>();
+ this.isShutdown = BackgroundService.this.isShutdown;
+ }
+
+ private boolean waitForNextInterval() {
+
+ if (numberOfLoops > 0) {
+ numberOfLoops--;
+ if (numberOfLoops == 0) {
+ return false;
+ }
}
- BackgroundTaskQueue tasks = getTasks();
- if (tasks.isEmpty()) {
- // No task found, or some problems to init tasks
- // return and retry in next interval.
- return;
+ // Check if the executor has been shutdown during task execution.
+ if (!isShutdown.get()) {
+ synchronized (BackgroundService.this) {
+ // Get the shutdown flag again after acquiring the lock.
+ if (isShutdown.get()) {
+ return false;
+ }
+ try {
+ BackgroundService.this.wait(intervalInMillis);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOG.warn("Interrupted while waiting for next interval.", e);
+ return false;
+ }
+ }
}
+ return !isShutdown.get();
+ }
+
+ @Override
+ public void compute() {
if (LOG.isDebugEnabled()) {
- LOG.debug("Number of background tasks to execute : {}", tasks.size());
+ LOG.debug("Running background service : {}", serviceName);
}
- synchronized (BackgroundService.this) {
+ boolean runAgain = true;
+ do {
+ future = new CompletableFuture<>();
+ BackgroundTaskQueue tasks = getTasks(true);
+ if (tasks.isEmpty()) {
+ // No task found, or some problems to init tasks
+ // return and retry in next interval.
+ continue;
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Number of background tasks to execute : {}",
tasks.size());
+ }
+
while (!tasks.isEmpty()) {
BackgroundTask task = tasks.poll();
- future = future.thenCombine(CompletableFuture.runAsync(() -> {
- long startTime = System.nanoTime();
- try {
- BackgroundTaskResult result = task.call();
- if (LOG.isDebugEnabled()) {
- LOG.debug("task execution result size {}", result.getSize());
- }
- } catch (Throwable e) {
- LOG.error("Background task execution failed", e);
- if (e instanceof Error) {
- throw (Error) e;
- }
- } finally {
- long endTime = System.nanoTime();
- if (endTime - startTime > serviceTimeoutInNanos) {
- LOG.warn("{} Background task execution took {}ns >
{}ns(timeout)",
- serviceName, endTime - startTime, serviceTimeoutInNanos);
- }
+ // Fork and submit the task back to executor.
+ task.fork();
+ tasksInFlight.offer(task);
+ }
+
+ while (!tasksInFlight.isEmpty()) {
+ BackgroundTask taskInFlight = tasksInFlight.poll();
+ // Join the tasks forked before and wait for the result one by one.
+ BackgroundTask.BackgroundTaskForkResult result = taskInFlight.join();
Review Comment:
Do the task is cancellable? if crosses certain time
##########
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java:
##########
@@ -138,84 +138,148 @@ protected void execTaskCompletion() { }
* Run one or more background tasks concurrently.
* Wait until all tasks to return the result.
*/
- public class PeriodicalTask implements Runnable {
- @Override
- public void run() {
- // wait for previous set of tasks to complete
- try {
- future.join();
- } catch (RuntimeException e) {
- LOG.error("Background service execution failed.", e);
- } finally {
- execTaskCompletion();
- }
+ public class PeriodicalTask extends RecursiveAction {
+ private int numberOfLoops;
+ private final Queue<BackgroundTask> tasksInFlight;
+ private final AtomicBoolean isShutdown;
- if (LOG.isDebugEnabled()) {
- LOG.debug("Running background service : {}", serviceName);
+ public PeriodicalTask(int numberOfLoops) {
+ this.numberOfLoops = numberOfLoops;
+ this.tasksInFlight = new LinkedList<>();
+ this.isShutdown = BackgroundService.this.isShutdown;
+ }
+
+ private boolean waitForNextInterval() {
+
+ if (numberOfLoops > 0) {
+ numberOfLoops--;
+ if (numberOfLoops == 0) {
+ return false;
+ }
}
- BackgroundTaskQueue tasks = getTasks();
- if (tasks.isEmpty()) {
- // No task found, or some problems to init tasks
- // return and retry in next interval.
- return;
+ // Check if the executor has been shutdown during task execution.
+ if (!isShutdown.get()) {
+ synchronized (BackgroundService.this) {
+ // Get the shutdown flag again after acquiring the lock.
+ if (isShutdown.get()) {
+ return false;
+ }
+ try {
+ BackgroundService.this.wait(intervalInMillis);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOG.warn("Interrupted while waiting for next interval.", e);
+ return false;
+ }
+ }
}
+ return !isShutdown.get();
+ }
+
+ @Override
+ public void compute() {
if (LOG.isDebugEnabled()) {
- LOG.debug("Number of background tasks to execute : {}", tasks.size());
+ LOG.debug("Running background service : {}", serviceName);
}
- synchronized (BackgroundService.this) {
+ boolean runAgain = true;
+ do {
+ future = new CompletableFuture<>();
+ BackgroundTaskQueue tasks = getTasks(true);
+ if (tasks.isEmpty()) {
+ // No task found, or some problems to init tasks
+ // return and retry in next interval.
+ continue;
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Number of background tasks to execute : {}",
tasks.size());
+ }
+
while (!tasks.isEmpty()) {
BackgroundTask task = tasks.poll();
- future = future.thenCombine(CompletableFuture.runAsync(() -> {
- long startTime = System.nanoTime();
- try {
- BackgroundTaskResult result = task.call();
- if (LOG.isDebugEnabled()) {
- LOG.debug("task execution result size {}", result.getSize());
- }
- } catch (Throwable e) {
- LOG.error("Background task execution failed", e);
- if (e instanceof Error) {
- throw (Error) e;
- }
- } finally {
- long endTime = System.nanoTime();
- if (endTime - startTime > serviceTimeoutInNanos) {
- LOG.warn("{} Background task execution took {}ns >
{}ns(timeout)",
- serviceName, endTime - startTime, serviceTimeoutInNanos);
- }
+ // Fork and submit the task back to executor.
+ task.fork();
+ tasksInFlight.offer(task);
+ }
+
+ while (!tasksInFlight.isEmpty()) {
+ BackgroundTask taskInFlight = tasksInFlight.poll();
+ // Join the tasks forked before and wait for the result one by one.
+ BackgroundTask.BackgroundTaskForkResult result = taskInFlight.join();
+ // Check for exception first in the task execution.
+ if (result.getThrowable() != null) {
+ LOG.error("Background task execution failed",
result.getThrowable());
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("task execution result size {}",
result.getResult().getSize());
}
- }, exec).exceptionally(e -> null), (Void1, Void) -> null);
+ }
+ if (result.getTotalExecutionTime() > serviceTimeoutInNanos) {
+ LOG.warn("{} Background task execution took {}ns > {}ns(timeout)",
+ serviceName, result.getTotalExecutionTime(),
serviceTimeoutInNanos);
+ }
}
- }
+ future.complete(null);
Review Comment:
unable to get use of future here
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]