This is an automated email from the ASF dual-hosted git repository. gnodet pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
commit e0a7a34a9fac477db51946486ea0505e4df07369 Author: Guillaume Nodet <[email protected]> AuthorDate: Tue May 21 21:21:41 2024 +0200 Avoid busy-loop in aggregator processor --- .../processor/aggregate/AggregateProcessor.java | 52 ++++++++++++++++++++-- 1 file changed, 49 insertions(+), 3 deletions(-) diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java index fae1fa6a712..2c75f512b54 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java @@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.AbstractQueuedSynchronizer; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -124,6 +125,7 @@ public class AggregateProcessor extends AsyncProcessorSupport private Map<String, String> closedCorrelationKeys; private final Set<String> batchConsumerCorrelationKeys = new ConcurrentSkipListSet<>(); private final Set<String> inProgressCompleteExchanges = ConcurrentHashMap.newKeySet(); + private final WaitableInteger inProgressCount = new WaitableInteger(); private final Set<String> unconfirmedCompleteExchanges = ConcurrentHashMap.newKeySet(); private final Set<String> inProgressCompleteExchangesForRecoveryTask = ConcurrentHashMap.newKeySet(); private final Map<String, RedeliveryData> redeliveryState = new ConcurrentHashMap<>(); @@ -836,6 +838,7 @@ public class AggregateProcessor extends AsyncProcessorSupport LOG.debug("Aggregation complete for correlation key {} sending aggregated exchange: {}", key, exchange); // add this as in progress before we submit the task + inProgressCount.increment(); inProgressCompleteExchanges.add(exchange.getExchangeId()); if (recoveryInProgress.get()) { inProgressCompleteExchangesForRecoveryTask.add(exchange.getExchangeId()); @@ -1219,6 +1222,7 @@ public class AggregateProcessor extends AsyncProcessorSupport // must remember to remove in progress when we failed inProgressCompleteExchanges.remove(exchangeId); + inProgressCount.decrement(); // do not remove redelivery state as we need it when we redeliver again later } @@ -1244,6 +1248,7 @@ public class AggregateProcessor extends AsyncProcessorSupport } finally { // must remember to remove in progress when we are complete inProgressCompleteExchanges.remove(exchangeId); + inProgressCount.decrement(); } } @@ -1698,15 +1703,14 @@ public class AggregateProcessor extends AsyncProcessorSupport int expected = forceCompletionOfAllGroups(); StopWatch watch = new StopWatch(); - while (!inProgressCompleteExchanges.isEmpty()) { + if (!inProgressCompleteExchanges.isEmpty()) { LOG.trace("Waiting for {} inflight exchanges to complete", getInProgressCompleteExchanges()); try { - Thread.sleep(100); + inProgressCount.await(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); // break out as we got interrupted such as the JVM terminating LOG.warn("Interrupted while waiting for {} inflight exchanges to complete.", getInProgressCompleteExchanges()); - break; } } @@ -1723,6 +1727,7 @@ public class AggregateProcessor extends AsyncProcessorSupport // cleanup when shutting down inProgressCompleteExchanges.clear(); + inProgressCount.reset(); if (shutdownExecutorService) { camelContext.getExecutorServiceManager().shutdownNow(executorService); @@ -1877,4 +1882,45 @@ public class AggregateProcessor extends AsyncProcessorSupport return total; } + /** + * Synchronization class to avoid busy-loop when waiting for exchanges to be processed during shutdown. + */ + protected static final class WaitableInteger extends AbstractQueuedSynchronizer { + + // await for this integer to be equal to zero + public void await() throws InterruptedException { + acquireSharedInterruptibly(0); // the arg is not used, see below + } + + // decrement the integer + public void decrement() { + releaseShared(-1); + } + + // increment the integer + public void increment() { + releaseShared(1); + } + + // reset the integer to zero, this call won't trigger threads blocked on an await() call + public void reset() { + setState(0); + } + + // the arg is passed through from acquireSharedInterruptibly, but not used + protected int tryAcquireShared(int unused) { + return (getState() == 0) ? 1 : -1; + } + + // called from releaseShared, i.e. from increment() and decrement() + protected boolean tryReleaseShared(int releases) { + for (;;) { + int c = getState(); + int nextc = c + releases; + if (compareAndSetState(c, nextc)) + return nextc == 0; + } + } + } + }
