This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit bf6d3a0f75f18564988f785f7cebacf0663103e9
Author: Michael Lück <[email protected]>
AuthorDate: Sat Nov 26 11:37:03 2022 +0100

    CAMEL-18766: background tasks without maxDuration are reeschedulable
    
    Make sure BackgroundTask Thread is ended after maxIterations even if
    unlimitedMaxDurations is used but maintain possibility to retry forever
    of maxIterations has not been set.
    
    Also some code cleanup: encapsulate predicate execution into
    Supplier<Boolean>. This allows that we only need one runTaskWrapper and
    reduces code duplication. For example we can fix the maxIteration bug in
    a single place
    
    Also: minor change in comment, Co-authored-by: Nicolas Filotto 
<[email protected]>
---
 .../apache/camel/support/task/BackgroundTask.java  | 56 ++++++++--------------
 .../task/BackgroundIterationTimeTaskTest.java      |  3 +-
 2 files changed, 21 insertions(+), 38 deletions(-)

diff --git 
a/core/camel-support/src/main/java/org/apache/camel/support/task/BackgroundTask.java
 
b/core/camel-support/src/main/java/org/apache/camel/support/task/BackgroundTask.java
index 4909107e496..c7e63fb50e9 100644
--- 
a/core/camel-support/src/main/java/org/apache/camel/support/task/BackgroundTask.java
+++ 
b/core/camel-support/src/main/java/org/apache/camel/support/task/BackgroundTask.java
@@ -85,38 +85,21 @@ public class BackgroundTask implements BlockingTask {
         this.name = name;
     }
 
-    private <T> void runTaskWrapper(CountDownLatch latch, Predicate<T> 
predicate, T payload) {
+    private void runTaskWrapper(CountDownLatch latch, BooleanSupplier 
supplier, ExecutionResult result) {
         LOG.trace("Current latch value: {}", latch.getCount());
-
         if (latch.getCount() == 0) {
             return;
         }
 
         if (!budget.next()) {
             LOG.warn("The task {} does not have more budget to continue 
running", name);
-
-            return;
-        }
-
-        if (predicate.test(payload)) {
+            result.completed = false;
             latch.countDown();
-            LOG.trace("Task {} has succeeded and the current task won't be 
schedulable anymore: {}", name, latch.getCount());
-        }
-    }
-
-    private void runTaskWrapper(CountDownLatch latch, BooleanSupplier 
supplier) {
-        LOG.trace("Current latch value: {}", latch.getCount());
-        if (latch.getCount() == 0) {
-            return;
-        }
-
-        if (!budget.next()) {
-            LOG.warn("The task {} does not have more budget to continue 
running", name);
-
             return;
         }
 
         if (supplier.getAsBoolean()) {
+            result.completed = true;
             latch.countDown();
             LOG.trace("Task {} succeeded and the current task won't be 
schedulable anymore: {}", name, latch.getCount());
         }
@@ -124,39 +107,36 @@ public class BackgroundTask implements BlockingTask {
 
     @Override
     public <T> boolean run(Predicate<T> predicate, T payload) {
-        CountDownLatch latch = new CountDownLatch(1);
-
-        Future<?> task = service.scheduleAtFixedRate(() -> 
runTaskWrapper(latch, predicate, payload),
-                budget.initialDelay(), budget.interval(), 
TimeUnit.MILLISECONDS);
-
-        return waitForTaskCompletion(latch, task);
+        return this.run(() -> predicate.test(payload));
     }
 
     @Override
     public boolean run(BooleanSupplier supplier) {
-        CountDownLatch latch = new CountDownLatch(1);
-
-        Future<?> task = service.scheduleAtFixedRate(() -> 
runTaskWrapper(latch, supplier), budget.initialDelay(),
+        final CountDownLatch latch = new CountDownLatch(1);
+        // we need a wrapper for the actual result that will be defined in the 
runTaskWrapper method which 
+        // is executed by the by ExecutorService
+        // the result will define whether the task executed successfully or 
has been ended because the budget has been consumed
+        // that will be used as the return value of this method (see 
description of super.run(...) methods
+        final ExecutionResult result = new ExecutionResult();
+
+        Future<?> task = service.scheduleAtFixedRate(() -> 
runTaskWrapper(latch, supplier, result), budget.initialDelay(),
                 budget.interval(), TimeUnit.MILLISECONDS);
 
-        return waitForTaskCompletion(latch, task);
+        waitForTaskCompletion(latch, task);
+        return result.completed;
     }
 
-    private boolean waitForTaskCompletion(CountDownLatch latch, Future<?> 
task) {
-        boolean completed = false;
+    private void waitForTaskCompletion(CountDownLatch latch, Future<?> task) {
         try {
             // We need it to be cancellable/non-runnable after reaching a 
certain point, and it needs to be deterministic.
             // This is why we ignore the ScheduledFuture returned and 
implement the go/no-go using a latch.
             if (budget.maxDuration() == TimeBoundedBudget.UNLIMITED_DURATION) {
                 latch.await();
-                completed = true;
             } else {
                 if (!latch.await(budget.maxDuration(), TimeUnit.MILLISECONDS)) 
{
                     LOG.debug("Timeout out waiting for the completion of the 
task");
                 } else {
                     LOG.debug("The task has finished the execution and it is 
ready to continue");
-
-                    completed = true;
                 }
             }
 
@@ -167,12 +147,14 @@ public class BackgroundTask implements BlockingTask {
         } finally {
             elapsed = budget.elapsed();
         }
-
-        return completed;
     }
 
     @Override
     public Duration elapsed() {
         return elapsed;
     }
+
+    private static class ExecutionResult {
+        private boolean completed;
+    }
 }
diff --git 
a/core/camel-support/src/test/java/org/apache/camel/support/task/BackgroundIterationTimeTaskTest.java
 
b/core/camel-support/src/test/java/org/apache/camel/support/task/BackgroundIterationTimeTaskTest.java
index cd5f7fb83e5..b7f112e1a2c 100644
--- 
a/core/camel-support/src/test/java/org/apache/camel/support/task/BackgroundIterationTimeTaskTest.java
+++ 
b/core/camel-support/src/test/java/org/apache/camel/support/task/BackgroundIterationTimeTaskTest.java
@@ -42,7 +42,8 @@ public class BackgroundIterationTimeTaskTest extends 
TaskTestSupport {
                         .withMaxIterations(3)
                         .withInterval(Duration.ofSeconds(1))
                         .withInitialDelay(Duration.ZERO)
-                        .withMaxDuration(Duration.ofSeconds(5))
+                        // use unlimited duration so we're sure that the task 
is really canceled after maxIterations
+                        .withUnlimitedDuration()
                         .build())
                 .build();
 

Reply via email to