[ https://issues.apache.org/jira/browse/CAMEL-22176?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17982705#comment-17982705 ]
Andre edited comment on CAMEL-22176 at 6/18/25 1:56 PM: -------------------------------------------------------- Unfortunately I do not yet have a minimal example, but maybe some more insights. I adjusted - based on current main - the implementation like follows: {noformat} private boolean evictionNeeded() { if (isCacheFull()) { LOG.warn("evicitionNeeded as cache is full {} > {} (queue size: {} > {})", size(), maximumCacheSize, getQueueSize(), Math.max(2 * maximumCacheSize, MINIMUM_QUEUE_SIZE)); return true; } if (isQueueFull()) { LOG.warn("evicitionNeeded as queue is full {} > {}", getQueueSize(), Math.max(2 * maximumCacheSize, MINIMUM_QUEUE_SIZE)); return true; } return false; } /** * Removes duplicates from the queue of changes if the queue is full. */ private void compressChangesIfNeeded() { // noop Deque<Entry<K, ValueHolder<V>>> newChanges; Deque<Entry<K, ValueHolder<V>>> currentChanges; try { long start = System.currentTimeMillis(); LOG.warn("cCiN lock: acquiring"); lock.writeLock().lock(); LOG.warn("cCiN lock: acquired (after {} ms)", (System.currentTimeMillis() - start)); if (isQueueFull()) { newChanges = new ConcurrentLinkedDeque<>(); totalChanges.set(0); currentChanges = lastChanges.getAndSet(newChanges); } else { return; } } finally { long start = System.currentTimeMillis(); LOG.warn("cCiN lock: releasing"); lock.writeLock().unlock(); LOG.warn("cCiN lock: released (after {} ms)", (System.currentTimeMillis() - start)); } Set<K> keys = new HashSet<>(Math.max(2 * maximumCacheSize, MINIMUM_QUEUE_SIZE)); Entry<K, ValueHolder<V>> entry; while ((entry = currentChanges.pollLast()) != null) { LOG.debug("readding key {}", entry.getKey()); if (keys.add(entry.getKey())) { newChanges.addFirst(entry); totalChanges.incrementAndGet(); } } LOG.warn("readded {} keys", totalChanges.get()); } @Override public void close() { cache.lock.readLock().unlock(); if (cache.evictionNeeded() && cache.eviction.compareAndSet(false, true)) { try { do { LOG.warn("closing operationcontext as {}", cache.evictionNeeded()); cache.compressChangesIfNeeded(); LOG.warn("cache is full? {}", cache.isCacheFull()); if (cache.isCacheFull()) { Entry<K, ValueHolder<V>> oldest = cache.nextOldestChange(); if (cache.delegate.remove(oldest.getKey(), oldest.getValue())) { cache.evict.accept(oldest.getValue().get()); } } else { LOG.warn("cache is not full! eviction? {}", cache.evictionNeeded()); } } while (cache.evictionNeeded()); LOG.warn("cache eviction completed"); } finally { cache.eviction.set(false); } } } {noformat} I then started my service - which has a from {{file}} and a to {{sftp}}. {noformat} |From file:///home/my/data?bridgeErrorHandler=true&delay=60000&idempotent=true&idempotentKey=${file:absolute.path}-${file:modified}&idempotentRepository=#MongoDbIdempotentRepository&inProgressRepository=#InProgressRepository&noop=true&readLock=changed&readLockCheckInterval=2000ms&readLockMinAge=1000ms&readLockTimeout=4000ms&recursive=true | To sftp://user@1.2.3.4:22/path/to/my/data {noformat} I then produced 10k files like this: {noformat} for i in {1..10000}; do NUM=$(od -An -N8 -tu8 < /dev/urandom | tr -d ' '); echo "$NUM" > "$NUM"; done {noformat} Now each file is transferred by the route, see my service log: {noformat} 2025-06-18T14:55:15.411+0200 # 21483 times the operation context did cache eviction, i.e. "cache eviction completed" $ cat log_01.log | grep -i closing | wc -l 21483 # only 3 files have been archived until then at 2025-06-18T14:56:13.356+0200 (i.e. a minute later) $ cat log_01.log | grep -i "archived file" | wc -l 3 # another 22479 have been archived at 2025-06-18T15:01:43.344+0200 $ cat log_00.log | grep -i "archived file" | wc -l 22479 # nothing until 2025-06-18T15:02:57.144+0200 (over a minute) evicitionNeeded as cache is full 1001 > 1000 (queue size: 2001 > 2000) ... # this time everything succeeded eventually, yet the cache eviction was called 28008 times and is producing quite high cpu usage. {noformat} was (Author: adoser): Unfortunately I do not yet have a minimal example, but maybe some more insights. I adjusted - based on current main - the implementation like follows: {noformat} private boolean evictionNeeded() { if (isCacheFull()) { LOG.warn("evicitionNeeded as cache is full {} > {} (queue size: {} > {})", size(), maximumCacheSize, getQueueSize(), Math.max(2 * maximumCacheSize, MINIMUM_QUEUE_SIZE)); return true; } if (isQueueFull()) { LOG.warn("evicitionNeeded as queue is full {} > {}", getQueueSize(), Math.max(2 * maximumCacheSize, MINIMUM_QUEUE_SIZE)); return true; } return false; } /** * Removes duplicates from the queue of changes if the queue is full. */ private void compressChangesIfNeeded() { // noop Deque<Entry<K, ValueHolder<V>>> newChanges; Deque<Entry<K, ValueHolder<V>>> currentChanges; try { long start = System.currentTimeMillis(); LOG.warn("cCiN lock: acquiring"); lock.writeLock().lock(); LOG.warn("cCiN lock: acquired (after {} ms)", (System.currentTimeMillis() - start)); if (isQueueFull()) { newChanges = new ConcurrentLinkedDeque<>(); totalChanges.set(0); currentChanges = lastChanges.getAndSet(newChanges); } else { return; } } finally { long start = System.currentTimeMillis(); LOG.warn("cCiN lock: releasing"); lock.writeLock().unlock(); LOG.warn("cCiN lock: released (after {} ms)", (System.currentTimeMillis() - start)); } Set<K> keys = new HashSet<>(Math.max(2 * maximumCacheSize, MINIMUM_QUEUE_SIZE)); Entry<K, ValueHolder<V>> entry; while ((entry = currentChanges.pollLast()) != null) { LOG.debug("readding key {}", entry.getKey()); if (keys.add(entry.getKey())) { newChanges.addFirst(entry); totalChanges.incrementAndGet(); } } LOG.warn("readded {} keys", totalChanges.get()); } @Override public void close() { cache.lock.readLock().unlock(); if (cache.evictionNeeded() && cache.eviction.compareAndSet(false, true)) { try { do { LOG.warn("closing operationcontext as {}", cache.evictionNeeded()); cache.compressChangesIfNeeded(); LOG.warn("cache is full? {}", cache.isCacheFull()); if (cache.isCacheFull()) { Entry<K, ValueHolder<V>> oldest = cache.nextOldestChange(); if (cache.delegate.remove(oldest.getKey(), oldest.getValue())) { cache.evict.accept(oldest.getValue().get()); } } else { LOG.warn("cache is not full! eviction? {}", cache.evictionNeeded()); } } while (cache.evictionNeeded()); LOG.warn("cache eviction completed"); } finally { cache.eviction.set(false); } } } {noformat} I then started my service - which has a from {{file}} and a to {{sftp}}. {noformat} |From file:///home/my/data?bridgeErrorHandler=true&delay=60000&idempotent=true&idempotentKey=${file:absolute.path}-${file:modified}&idempotentRepository=#MongoDbIdempotentRepository&inProgressRepository=#InProgressRepository&noop=true&readLock=changed&readLockCheckInterval=2000ms&readLockMinAge=1000ms&readLockTimeout=4000ms&recursive=true | To sftp://user@1.2.3.4:22/path/to/my/data {noformat} I then produced 10k files like this: {noformat} for i in {1..10000}; do NUM=$(od -An -N8 -tu8 < /dev/urandom | tr -d ' '); echo "$NUM" > "$NUM"; done {noformat} Now each file is transferred by the route, see my service log: {noformat} 2025-06-18T14:55:15.411+0200 # 21483 times the operation context did cache eviction, i.e. "cache eviction completed" $ cat log_01.log | grep -i closing | wc -l 21483 # only 3 files have been archived until then at 2025-06-18T14:56:13.356+0200 (i.e. a second later) $ cat log_01.log | grep -i "archived file" | wc -l 3 # another 22479 have been archived at 2025-06-18T15:01:43.344+0200 $ cat log_00.log | grep -i "archived file" | wc -l 22479 # nothing until 2025-06-18T15:02:57.144+0200 (over a minute) evicitionNeeded as cache is full 1001 > 1000 (queue size: 2001 > 2000) ... # this time everything succeeded eventually, yet the cache eviction was called 28008 times and is producing quite high cpu usage. {noformat} > High CPU Usage and Deadlock/Race Condition (?) in SimpleLRUCache after > Upgrade (>4.8.2) > --------------------------------------------------------------------------------------- > > Key: CAMEL-22176 > URL: https://issues.apache.org/jira/browse/CAMEL-22176 > Project: Camel > Issue Type: Bug > Components: camel-core > Affects Versions: 4.8.3 > Reporter: Andre > Priority: Major > > *Summary:* > After upgrading Apache Camel to a version greater than 4.8.2, we are > experiencing significant CPU usage spikes, potential deadlocks, or race > conditions. The issue appears to be related to the changes introduced in > CAMEL-21888, specifically affecting the {{compressChangesIfNeeded()}} method > in {{{}SimpleLRUCache{}}}. > *Environment:* > * Apache Camel version: 4.8.x (issue persists in 4.12.0) > * Previous working version: 4.8.2 > * Large file processing: thousands of files from a file input to SFTP > * Cache size: 2,000 > * Files processed per run: ~1,0000 > *Description:* > Following the upgrade, we observed that when processing large numbers of > files (e.g., 1,000 files per run), the {{lastChanges}} queue in > {{SimpleLRUCache}} fills up rapidly. For each file, the {{OperationContext}} > is invoked, which clears and empties the queue. Since our workload rarely has > duplicates, this results in a high number of iterations—potentially 50 > million for 1,000 files and a 50,000 cache size. > This behavior leads to: > * *High CPU usage* (sometimes reaching 100%) > * *Application startup delays or failures* > * *Potential deadlocks or race conditions* due to excessive locking or queue > operations > This appears to be a regression caused by the change in > {{{}SimpleLRUCache.{}}}. > *Steps to Reproduce:* > # Upgrade Camel to a version >4.8.2. > # Configure a route that processes thousands of files (e.g., from a > directory to an SFTP endpoint). > # Set the cache size to a high value (e.g., 50,000). > # Observe CPU usage and application responsiveness during processing. > *Expected Behavior:* > Efficient cache management without excessive CPU usage or risk of deadlocks, > even with large numbers of files and a high cache size. > *Actual Behavior:* > * CPU usage spikes to 100% during processing. > * Application may fail to start or becomes unresponsive. > * Large number of iterations in {{compressChangesIfNeeded()}} due to the way > {{lastChanges}} is managed. > *Suspected Root Cause:* > The change to use a {{Deque}} for {{lastChanges}} and the logic in > {{compressChangesIfNeeded()}} is not efficient for high-throughput scenarios > with large caches and frequent operations, especially when there are few or > no duplicates >  > *Notes* > * We noticed regression in two independent services after upgrading camel - > both with high indication that the LRUCache is the problem. > * -After changing {{compressChangesIfNeeded()}} to {{noop}} the CPU usage > and deadlock/race condition issues are gone- > -- This message was sent by Atlassian Jira (v8.20.10#820010)