swamirishi commented on code in PR #9390:
URL: https://github.com/apache/ozone/pull/9390#discussion_r2579117162
##########
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java:
##########
@@ -138,84 +138,148 @@ protected void execTaskCompletion() { }
* Run one or more background tasks concurrently.
* Wait until all tasks to return the result.
*/
- public class PeriodicalTask implements Runnable {
- @Override
- public void run() {
- // wait for previous set of tasks to complete
- try {
- future.join();
- } catch (RuntimeException e) {
- LOG.error("Background service execution failed.", e);
- } finally {
- execTaskCompletion();
- }
+ public class PeriodicalTask extends RecursiveAction {
+ private int numberOfLoops;
+ private final Queue<BackgroundTask> tasksInFlight;
+ private final AtomicBoolean isShutdown;
- if (LOG.isDebugEnabled()) {
- LOG.debug("Running background service : {}", serviceName);
+ public PeriodicalTask(int numberOfLoops) {
+ this.numberOfLoops = numberOfLoops;
+ this.tasksInFlight = new LinkedList<>();
+ this.isShutdown = BackgroundService.this.isShutdown;
+ }
+
+ private boolean waitForNextInterval() {
+
+ if (numberOfLoops > 0) {
+ numberOfLoops--;
+ if (numberOfLoops == 0) {
+ return false;
+ }
}
- BackgroundTaskQueue tasks = getTasks();
- if (tasks.isEmpty()) {
- // No task found, or some problems to init tasks
- // return and retry in next interval.
- return;
+ // Check if the executor has been shutdown during task execution.
+ if (!isShutdown.get()) {
+ synchronized (BackgroundService.this) {
+ // Get the shutdown flag again after acquiring the lock.
+ if (isShutdown.get()) {
+ return false;
+ }
+ try {
+ BackgroundService.this.wait(intervalInMillis);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOG.warn("Interrupted while waiting for next interval.", e);
+ return false;
+ }
+ }
}
+ return !isShutdown.get();
+ }
+
+ @Override
+ public void compute() {
if (LOG.isDebugEnabled()) {
- LOG.debug("Number of background tasks to execute : {}", tasks.size());
+ LOG.debug("Running background service : {}", serviceName);
}
- synchronized (BackgroundService.this) {
+ boolean runAgain = true;
+ do {
+ future = new CompletableFuture<>();
+ BackgroundTaskQueue tasks = getTasks(true);
+ if (tasks.isEmpty()) {
+ // No task found, or some problems to init tasks
+ // return and retry in next interval.
+ continue;
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Number of background tasks to execute : {}",
tasks.size());
+ }
+
while (!tasks.isEmpty()) {
BackgroundTask task = tasks.poll();
- future = future.thenCombine(CompletableFuture.runAsync(() -> {
- long startTime = System.nanoTime();
- try {
- BackgroundTaskResult result = task.call();
- if (LOG.isDebugEnabled()) {
- LOG.debug("task execution result size {}", result.getSize());
- }
- } catch (Throwable e) {
- LOG.error("Background task execution failed", e);
- if (e instanceof Error) {
- throw (Error) e;
- }
- } finally {
- long endTime = System.nanoTime();
- if (endTime - startTime > serviceTimeoutInNanos) {
- LOG.warn("{} Background task execution took {}ns >
{}ns(timeout)",
- serviceName, endTime - startTime, serviceTimeoutInNanos);
- }
+ // Fork and submit the task back to executor.
+ task.fork();
+ tasksInFlight.offer(task);
+ }
+
+ while (!tasksInFlight.isEmpty()) {
+ BackgroundTask taskInFlight = tasksInFlight.poll();
+ // Join the tasks forked before and wait for the result one by one.
+ BackgroundTask.BackgroundTaskForkResult result = taskInFlight.join();
Review Comment:
We can implement something if we want to. But this is not supported even
today.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]