This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
commit bf6d3a0f75f18564988f785f7cebacf0663103e9 Author: Michael Lück <[email protected]> AuthorDate: Sat Nov 26 11:37:03 2022 +0100 CAMEL-18766: background tasks without maxDuration are reeschedulable Make sure BackgroundTask Thread is ended after maxIterations even if unlimitedMaxDurations is used but maintain possibility to retry forever of maxIterations has not been set. Also some code cleanup: encapsulate predicate execution into Supplier<Boolean>. This allows that we only need one runTaskWrapper and reduces code duplication. For example we can fix the maxIteration bug in a single place Also: minor change in comment, Co-authored-by: Nicolas Filotto <[email protected]> --- .../apache/camel/support/task/BackgroundTask.java | 56 ++++++++-------------- .../task/BackgroundIterationTimeTaskTest.java | 3 +- 2 files changed, 21 insertions(+), 38 deletions(-) diff --git a/core/camel-support/src/main/java/org/apache/camel/support/task/BackgroundTask.java b/core/camel-support/src/main/java/org/apache/camel/support/task/BackgroundTask.java index 4909107e496..c7e63fb50e9 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/task/BackgroundTask.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/task/BackgroundTask.java @@ -85,38 +85,21 @@ public class BackgroundTask implements BlockingTask { this.name = name; } - private <T> void runTaskWrapper(CountDownLatch latch, Predicate<T> predicate, T payload) { + private void runTaskWrapper(CountDownLatch latch, BooleanSupplier supplier, ExecutionResult result) { LOG.trace("Current latch value: {}", latch.getCount()); - if (latch.getCount() == 0) { return; } if (!budget.next()) { LOG.warn("The task {} does not have more budget to continue running", name); - - return; - } - - if (predicate.test(payload)) { + result.completed = false; latch.countDown(); - LOG.trace("Task {} has succeeded and the current task won't be schedulable anymore: {}", name, latch.getCount()); - } - } - - private void runTaskWrapper(CountDownLatch latch, BooleanSupplier supplier) { - LOG.trace("Current latch value: {}", latch.getCount()); - if (latch.getCount() == 0) { - return; - } - - if (!budget.next()) { - LOG.warn("The task {} does not have more budget to continue running", name); - return; } if (supplier.getAsBoolean()) { + result.completed = true; latch.countDown(); LOG.trace("Task {} succeeded and the current task won't be schedulable anymore: {}", name, latch.getCount()); } @@ -124,39 +107,36 @@ public class BackgroundTask implements BlockingTask { @Override public <T> boolean run(Predicate<T> predicate, T payload) { - CountDownLatch latch = new CountDownLatch(1); - - Future<?> task = service.scheduleAtFixedRate(() -> runTaskWrapper(latch, predicate, payload), - budget.initialDelay(), budget.interval(), TimeUnit.MILLISECONDS); - - return waitForTaskCompletion(latch, task); + return this.run(() -> predicate.test(payload)); } @Override public boolean run(BooleanSupplier supplier) { - CountDownLatch latch = new CountDownLatch(1); - - Future<?> task = service.scheduleAtFixedRate(() -> runTaskWrapper(latch, supplier), budget.initialDelay(), + final CountDownLatch latch = new CountDownLatch(1); + // we need a wrapper for the actual result that will be defined in the runTaskWrapper method which + // is executed by the by ExecutorService + // the result will define whether the task executed successfully or has been ended because the budget has been consumed + // that will be used as the return value of this method (see description of super.run(...) methods + final ExecutionResult result = new ExecutionResult(); + + Future<?> task = service.scheduleAtFixedRate(() -> runTaskWrapper(latch, supplier, result), budget.initialDelay(), budget.interval(), TimeUnit.MILLISECONDS); - return waitForTaskCompletion(latch, task); + waitForTaskCompletion(latch, task); + return result.completed; } - private boolean waitForTaskCompletion(CountDownLatch latch, Future<?> task) { - boolean completed = false; + private void waitForTaskCompletion(CountDownLatch latch, Future<?> task) { try { // We need it to be cancellable/non-runnable after reaching a certain point, and it needs to be deterministic. // This is why we ignore the ScheduledFuture returned and implement the go/no-go using a latch. if (budget.maxDuration() == TimeBoundedBudget.UNLIMITED_DURATION) { latch.await(); - completed = true; } else { if (!latch.await(budget.maxDuration(), TimeUnit.MILLISECONDS)) { LOG.debug("Timeout out waiting for the completion of the task"); } else { LOG.debug("The task has finished the execution and it is ready to continue"); - - completed = true; } } @@ -167,12 +147,14 @@ public class BackgroundTask implements BlockingTask { } finally { elapsed = budget.elapsed(); } - - return completed; } @Override public Duration elapsed() { return elapsed; } + + private static class ExecutionResult { + private boolean completed; + } } diff --git a/core/camel-support/src/test/java/org/apache/camel/support/task/BackgroundIterationTimeTaskTest.java b/core/camel-support/src/test/java/org/apache/camel/support/task/BackgroundIterationTimeTaskTest.java index cd5f7fb83e5..b7f112e1a2c 100644 --- a/core/camel-support/src/test/java/org/apache/camel/support/task/BackgroundIterationTimeTaskTest.java +++ b/core/camel-support/src/test/java/org/apache/camel/support/task/BackgroundIterationTimeTaskTest.java @@ -42,7 +42,8 @@ public class BackgroundIterationTimeTaskTest extends TaskTestSupport { .withMaxIterations(3) .withInterval(Duration.ofSeconds(1)) .withInitialDelay(Duration.ZERO) - .withMaxDuration(Duration.ofSeconds(5)) + // use unlimited duration so we're sure that the task is really canceled after maxIterations + .withUnlimitedDuration() .build()) .build();
