This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
commit f7cd5cab83228cd8e7bb77842976d1f9ed6ded43 Author: Michael Lück <[email protected]> AuthorDate: Tue Nov 29 19:06:35 2022 +0100 Refactoring BackgroundTask - made Latch and completed to members as the BackgroundTask is statefull we can just add the latch and the completed flag (introduced for CAMEL-18766) as members of the Background task (see discussion in github pull request 8785) --- .../apache/camel/support/task/BackgroundTask.java | 30 ++++++++-------------- .../apache/camel/support/task/BlockingTask.java | 2 +- 2 files changed, 12 insertions(+), 20 deletions(-) diff --git a/core/camel-support/src/main/java/org/apache/camel/support/task/BackgroundTask.java b/core/camel-support/src/main/java/org/apache/camel/support/task/BackgroundTask.java index 035e795b422..6fed38feb10 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/task/BackgroundTask.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/task/BackgroundTask.java @@ -24,7 +24,6 @@ import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.function.BooleanSupplier; -import java.util.function.Predicate; import org.apache.camel.support.task.budget.TimeBoundedBudget; import org.apache.camel.support.task.budget.TimeBudget; @@ -77,7 +76,10 @@ public class BackgroundTask implements BlockingTask { private final TimeBudget budget; private final ScheduledExecutorService service; private final String name; + private final CountDownLatch latch = new CountDownLatch(1); + private Duration elapsed = Duration.ZERO; + private boolean completed; BackgroundTask(TimeBudget budget, ScheduledExecutorService service, String name) { this.budget = budget; @@ -85,7 +87,7 @@ public class BackgroundTask implements BlockingTask { this.name = name; } - private void runTaskWrapper(CountDownLatch latch, BooleanSupplier supplier, ExecutionResult result) { + private void runTaskWrapper(BooleanSupplier supplier) { LOG.trace("Current latch value: {}", latch.getCount()); if (latch.getCount() == 0) { return; @@ -93,13 +95,13 @@ public class BackgroundTask implements BlockingTask { if (!budget.next()) { LOG.warn("The task {} does not have more budget to continue running", name); - result.completed = false; + completed = false; latch.countDown(); return; } if (supplier.getAsBoolean()) { - result.completed = true; + completed = true; latch.countDown(); LOG.trace("Task {} succeeded and the current task won't be schedulable anymore: {}", name, latch.getCount()); } @@ -107,21 +109,15 @@ public class BackgroundTask implements BlockingTask { @Override public boolean run(BooleanSupplier supplier) { - final CountDownLatch latch = new CountDownLatch(1); - // we need a wrapper for the actual result that will be defined in the runTaskWrapper method which - // is executed by the by ExecutorService - // the result will define whether the task executed successfully or has been ended because the budget has been consumed - // that will be used as the return value of this method (see description of super.run(...) methods - final ExecutionResult result = new ExecutionResult(); - - Future<?> task = service.scheduleAtFixedRate(() -> runTaskWrapper(latch, supplier, result), budget.initialDelay(), + + Future<?> task = service.scheduleAtFixedRate(() -> runTaskWrapper(supplier), budget.initialDelay(), budget.interval(), TimeUnit.MILLISECONDS); - waitForTaskCompletion(latch, task); - return result.completed; + waitForTaskCompletion(task); + return completed; } - private void waitForTaskCompletion(CountDownLatch latch, Future<?> task) { + private void waitForTaskCompletion(Future<?> task) { try { // We need it to be cancellable/non-runnable after reaching a certain point, and it needs to be deterministic. // This is why we ignore the ScheduledFuture returned and implement the go/no-go using a latch. @@ -148,8 +144,4 @@ public class BackgroundTask implements BlockingTask { public Duration elapsed() { return elapsed; } - - private static class ExecutionResult { - private boolean completed; - } } diff --git a/core/camel-support/src/main/java/org/apache/camel/support/task/BlockingTask.java b/core/camel-support/src/main/java/org/apache/camel/support/task/BlockingTask.java index 757a7bcea20..ec17ea6f980 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/task/BlockingTask.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/task/BlockingTask.java @@ -37,7 +37,7 @@ public interface BlockingTask extends Task { * task was interrupted. */ default <T> boolean run(Predicate<T> predicate, T payload) { - return this.run(() -> predicate.test(payload)); + return this.run(() -> predicate.test(payload)); } /**
