This is an automated email from the ASF dual-hosted git repository.

gnodet pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit e0a7a34a9fac477db51946486ea0505e4df07369
Author: Guillaume Nodet <[email protected]>
AuthorDate: Tue May 21 21:21:41 2024 +0200

    Avoid busy-loop in aggregator processor
---
 .../processor/aggregate/AggregateProcessor.java    | 52 ++++++++++++++++++++--
 1 file changed, 49 insertions(+), 3 deletions(-)

diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
index fae1fa6a712..2c75f512b54 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
@@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.AbstractQueuedSynchronizer;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -124,6 +125,7 @@ public class AggregateProcessor extends 
AsyncProcessorSupport
     private Map<String, String> closedCorrelationKeys;
     private final Set<String> batchConsumerCorrelationKeys = new 
ConcurrentSkipListSet<>();
     private final Set<String> inProgressCompleteExchanges = 
ConcurrentHashMap.newKeySet();
+    private final WaitableInteger inProgressCount = new WaitableInteger();
     private final Set<String> unconfirmedCompleteExchanges = 
ConcurrentHashMap.newKeySet();
     private final Set<String> inProgressCompleteExchangesForRecoveryTask = 
ConcurrentHashMap.newKeySet();
     private final Map<String, RedeliveryData> redeliveryState = new 
ConcurrentHashMap<>();
@@ -836,6 +838,7 @@ public class AggregateProcessor extends 
AsyncProcessorSupport
         LOG.debug("Aggregation complete for correlation key {} sending 
aggregated exchange: {}", key, exchange);
 
         // add this as in progress before we submit the task
+        inProgressCount.increment();
         inProgressCompleteExchanges.add(exchange.getExchangeId());
         if (recoveryInProgress.get()) {
             
inProgressCompleteExchangesForRecoveryTask.add(exchange.getExchangeId());
@@ -1219,6 +1222,7 @@ public class AggregateProcessor extends 
AsyncProcessorSupport
 
             // must remember to remove in progress when we failed
             inProgressCompleteExchanges.remove(exchangeId);
+            inProgressCount.decrement();
             // do not remove redelivery state as we need it when we redeliver 
again later
         }
 
@@ -1244,6 +1248,7 @@ public class AggregateProcessor extends 
AsyncProcessorSupport
             } finally {
                 // must remember to remove in progress when we are complete
                 inProgressCompleteExchanges.remove(exchangeId);
+                inProgressCount.decrement();
             }
         }
 
@@ -1698,15 +1703,14 @@ public class AggregateProcessor extends 
AsyncProcessorSupport
         int expected = forceCompletionOfAllGroups();
 
         StopWatch watch = new StopWatch();
-        while (!inProgressCompleteExchanges.isEmpty()) {
+        if (!inProgressCompleteExchanges.isEmpty()) {
             LOG.trace("Waiting for {} inflight exchanges to complete", 
getInProgressCompleteExchanges());
             try {
-                Thread.sleep(100);
+                inProgressCount.await();
             } catch (InterruptedException e) {
                 Thread.currentThread().interrupt();
                 // break out as we got interrupted such as the JVM terminating
                 LOG.warn("Interrupted while waiting for {} inflight exchanges 
to complete.", getInProgressCompleteExchanges());
-                break;
             }
         }
 
@@ -1723,6 +1727,7 @@ public class AggregateProcessor extends 
AsyncProcessorSupport
 
         // cleanup when shutting down
         inProgressCompleteExchanges.clear();
+        inProgressCount.reset();
 
         if (shutdownExecutorService) {
             
camelContext.getExecutorServiceManager().shutdownNow(executorService);
@@ -1877,4 +1882,45 @@ public class AggregateProcessor extends 
AsyncProcessorSupport
         return total;
     }
 
+    /**
+     * Synchronization class to avoid busy-loop when waiting for exchanges to 
be processed during shutdown.
+     */
+    protected static final class WaitableInteger extends 
AbstractQueuedSynchronizer {
+
+        // await for this integer to be equal to zero
+        public void await() throws InterruptedException {
+            acquireSharedInterruptibly(0); // the arg is not used, see below
+        }
+
+        // decrement the integer
+        public void decrement() {
+            releaseShared(-1);
+        }
+
+        // increment the integer
+        public void increment() {
+            releaseShared(1);
+        }
+
+        // reset the integer to zero, this call won't trigger threads blocked 
on an await() call
+        public void reset() {
+            setState(0);
+        }
+
+        // the arg is passed through from acquireSharedInterruptibly, but not 
used
+        protected int tryAcquireShared(int unused) {
+            return (getState() == 0) ? 1 : -1;
+        }
+
+        // called from releaseShared, i.e. from increment() and decrement()
+        protected boolean tryReleaseShared(int releases) {
+            for (;;) {
+                int c = getState();
+                int nextc = c + releases;
+                if (compareAndSetState(c, nextc))
+                    return nextc == 0;
+            }
+        }
+    }
+
 }

Reply via email to