This is an automated email from the ASF dual-hosted git repository.

gnodet pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 12e7338a5c1cafbb3ea3bca7d5d67332f9ce462f
Author: Guillaume Nodet <gno...@gmail.com>
AuthorDate: Thu May 16 18:10:20 2024 +0200

    Fix busy-wait loops
---
 .../apache/camel/processor/StreamResequencer.java  | 18 ++++----
 .../errorhandler/RedeliveryErrorHandler.java       |  1 +
 .../processor/resequencer/ResequencerEngine.java   | 49 +++++++++++++++++++++-
 3 files changed, 57 insertions(+), 11 deletions(-)

diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/StreamResequencer.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/StreamResequencer.java
index 2771df6c9fb..efc951df726 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/StreamResequencer.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/StreamResequencer.java
@@ -239,16 +239,14 @@ public class StreamResequencer extends 
AsyncProcessorSupport
 
     @Override
     public boolean process(Exchange exchange, AsyncCallback callback) {
-        while (engine.size() >= capacity) {
-            try {
-                Thread.sleep(getTimeout());
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-                // we were interrupted so break out
-                exchange.setException(e);
-                callback.done(true);
-                return true;
-            }
+        try {
+            engine.waitUntil(s -> s.size() < capacity);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            // we were interrupted so break out
+            exchange.setException(e);
+            callback.done(true);
+            return true;
         }
 
         try {
diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
index 462980cdb45..164958fb736 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
@@ -1530,6 +1530,7 @@ public abstract class RedeliveryErrorHandler extends 
ErrorHandlerSupport
          * This task is for the synchronous blocking. If using async delayed 
then a scheduled thread pool is used for
          * sleeping and trigger redeliveries.
          */
+        @SuppressWarnings("BusyWait")
         public boolean sleep() throws InterruptedException {
             // for small delays then just sleep
             if (redeliveryDelay < 1000) {
diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java
index dc7bed62ec9..ff66e58a3b7 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java
@@ -16,7 +16,11 @@
  */
 package org.apache.camel.processor.resequencer;
 
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Timer;
+import java.util.concurrent.CountDownLatch;
+import java.util.function.Predicate;
 
 import org.apache.camel.util.concurrent.ThreadHelper;
 
@@ -78,6 +82,12 @@ public class ResequencerEngine<E> {
      */
     private Boolean rejectOld;
 
+    /**
+     * List containing wait conditions to be evaluated whenever the sequence 
is modified. Access to this field should be
+     * done inside a {@code synchronized(this)} block.
+     */
+    private Map<CountDownLatch, Predicate<Sequence<?>>> waitConditions = new 
HashMap<>();
+
     /**
      * Creates a new resequencer instance with a default timeout of 2000 
milliseconds.
      *
@@ -110,6 +120,37 @@ public class ResequencerEngine<E> {
         return sequence.size();
     }
 
+    /**
+     * Wait for the following condition to happen. Do not call this method 
while holding a lock on the resequencer
+     * engine, as it will deadlock. The predicate will be evaluated while 
holding a lock on the resequencer engine.
+     *
+     * @param  pred                 the condition to wait for
+     * @throws InterruptedException if the thread is interrupted
+     */
+    public void waitUntil(Predicate<Sequence<?>> pred) throws 
InterruptedException {
+        CountDownLatch latch;
+        synchronized (this) {
+            if (pred.test(sequence)) {
+                return;
+            }
+            latch = new CountDownLatch(1);
+            waitConditions.put(latch, pred);
+        }
+        latch.await();
+    }
+
+    private void evaluateConditions() {
+        synchronized (this) {
+            for (var it = waitConditions.entrySet().iterator(); it.hasNext();) 
{
+                Map.Entry<CountDownLatch, Predicate<Sequence<?>>> e = 
it.next();
+                if (e.getValue().test(sequence)) {
+                    e.getKey().countDown();
+                    it.remove();
+                }
+            }
+        }
+    }
+
     /**
      * Returns this resequencer's timeout value.
      *
@@ -214,6 +255,9 @@ public class ResequencerEngine<E> {
         if (!successorOfLastDelivered(element) && 
sequence.predecessor(element) == null) {
             element.schedule(defineTimeout());
         }
+
+        // evaluate wait conditions
+        evaluateConditions();
     }
 
     /**
@@ -240,7 +284,7 @@ public class ResequencerEngine<E> {
      * @throws Exception thrown by {@link SequenceSender#sendElement(Object)}.
      *
      */
-    public boolean deliverNext() throws Exception {
+    public synchronized boolean deliverNext() throws Exception {
         if (sequence.isEmpty()) {
             return false;
         }
@@ -261,6 +305,9 @@ public class ResequencerEngine<E> {
         // deliver the sequence element
         sequenceSender.sendElement(element.getObject());
 
+        // evaluate wait conditions
+        evaluateConditions();
+
         // element has been delivered
         return true;
     }

Reply via email to