This is an automated email from the ASF dual-hosted git repository. gnodet pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
commit 12e7338a5c1cafbb3ea3bca7d5d67332f9ce462f Author: Guillaume Nodet <gno...@gmail.com> AuthorDate: Thu May 16 18:10:20 2024 +0200 Fix busy-wait loops --- .../apache/camel/processor/StreamResequencer.java | 18 ++++---- .../errorhandler/RedeliveryErrorHandler.java | 1 + .../processor/resequencer/ResequencerEngine.java | 49 +++++++++++++++++++++- 3 files changed, 57 insertions(+), 11 deletions(-) diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/StreamResequencer.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/StreamResequencer.java index 2771df6c9fb..efc951df726 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/StreamResequencer.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/StreamResequencer.java @@ -239,16 +239,14 @@ public class StreamResequencer extends AsyncProcessorSupport @Override public boolean process(Exchange exchange, AsyncCallback callback) { - while (engine.size() >= capacity) { - try { - Thread.sleep(getTimeout()); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - // we were interrupted so break out - exchange.setException(e); - callback.done(true); - return true; - } + try { + engine.waitUntil(s -> s.size() < capacity); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + // we were interrupted so break out + exchange.setException(e); + callback.done(true); + return true; } try { diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java index 462980cdb45..164958fb736 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java @@ -1530,6 +1530,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport * This task is for the synchronous blocking. If using async delayed then a scheduled thread pool is used for * sleeping and trigger redeliveries. */ + @SuppressWarnings("BusyWait") public boolean sleep() throws InterruptedException { // for small delays then just sleep if (redeliveryDelay < 1000) { diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java index dc7bed62ec9..ff66e58a3b7 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java @@ -16,7 +16,11 @@ */ package org.apache.camel.processor.resequencer; +import java.util.HashMap; +import java.util.Map; import java.util.Timer; +import java.util.concurrent.CountDownLatch; +import java.util.function.Predicate; import org.apache.camel.util.concurrent.ThreadHelper; @@ -78,6 +82,12 @@ public class ResequencerEngine<E> { */ private Boolean rejectOld; + /** + * List containing wait conditions to be evaluated whenever the sequence is modified. Access to this field should be + * done inside a {@code synchronized(this)} block. + */ + private Map<CountDownLatch, Predicate<Sequence<?>>> waitConditions = new HashMap<>(); + /** * Creates a new resequencer instance with a default timeout of 2000 milliseconds. * @@ -110,6 +120,37 @@ public class ResequencerEngine<E> { return sequence.size(); } + /** + * Wait for the following condition to happen. Do not call this method while holding a lock on the resequencer + * engine, as it will deadlock. The predicate will be evaluated while holding a lock on the resequencer engine. + * + * @param pred the condition to wait for + * @throws InterruptedException if the thread is interrupted + */ + public void waitUntil(Predicate<Sequence<?>> pred) throws InterruptedException { + CountDownLatch latch; + synchronized (this) { + if (pred.test(sequence)) { + return; + } + latch = new CountDownLatch(1); + waitConditions.put(latch, pred); + } + latch.await(); + } + + private void evaluateConditions() { + synchronized (this) { + for (var it = waitConditions.entrySet().iterator(); it.hasNext();) { + Map.Entry<CountDownLatch, Predicate<Sequence<?>>> e = it.next(); + if (e.getValue().test(sequence)) { + e.getKey().countDown(); + it.remove(); + } + } + } + } + /** * Returns this resequencer's timeout value. * @@ -214,6 +255,9 @@ public class ResequencerEngine<E> { if (!successorOfLastDelivered(element) && sequence.predecessor(element) == null) { element.schedule(defineTimeout()); } + + // evaluate wait conditions + evaluateConditions(); } /** @@ -240,7 +284,7 @@ public class ResequencerEngine<E> { * @throws Exception thrown by {@link SequenceSender#sendElement(Object)}. * */ - public boolean deliverNext() throws Exception { + public synchronized boolean deliverNext() throws Exception { if (sequence.isEmpty()) { return false; } @@ -261,6 +305,9 @@ public class ResequencerEngine<E> { // deliver the sequence element sequenceSender.sendElement(element.getObject()); + // evaluate wait conditions + evaluateConditions(); + // element has been delivered return true; }