[ 
https://issues.apache.org/jira/browse/CAMEL-22176?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17982705#comment-17982705
 ] 

Andre edited comment on CAMEL-22176 at 6/18/25 1:56 PM:
--------------------------------------------------------

Unfortunately I do not yet have a minimal example, but maybe some more insights.

I adjusted - based on current main - the implementation like follows:

{noformat}

    private boolean evictionNeeded() {
        if (isCacheFull()) {
            LOG.warn("evicitionNeeded as cache is full {} > {} (queue size: {} 
> {})", size(),
                    maximumCacheSize, getQueueSize(), Math.max(2 * 
maximumCacheSize, MINIMUM_QUEUE_SIZE));
            return true;
        }

        if (isQueueFull()) {
            LOG.warn("evicitionNeeded as queue is full {} > {}", getQueueSize(),
                    Math.max(2 * maximumCacheSize, MINIMUM_QUEUE_SIZE));
            return true;
        }

        return false;
    }

    /**
     * Removes duplicates from the queue of changes if the queue is full.
     */
    private void compressChangesIfNeeded() {
        // noop
        Deque<Entry<K, ValueHolder<V>>> newChanges;
        Deque<Entry<K, ValueHolder<V>>> currentChanges;
        try {
            long start = System.currentTimeMillis();
            LOG.warn("cCiN lock: acquiring");
            lock.writeLock().lock();
            LOG.warn("cCiN lock: acquired (after {} ms)", 
(System.currentTimeMillis() - start));
            if (isQueueFull()) {
                newChanges = new ConcurrentLinkedDeque<>();
                totalChanges.set(0);
                currentChanges = lastChanges.getAndSet(newChanges);
            } else {
                return;
            }
        } finally {
            long start = System.currentTimeMillis();
            LOG.warn("cCiN lock: releasing");
            lock.writeLock().unlock();
            LOG.warn("cCiN lock: released (after {} ms)", 
(System.currentTimeMillis() - start));
        }
        Set<K> keys = new HashSet<>(Math.max(2 * maximumCacheSize, 
MINIMUM_QUEUE_SIZE));
        Entry<K, ValueHolder<V>> entry;
        while ((entry = currentChanges.pollLast()) != null) {
            LOG.debug("readding key {}", entry.getKey());
            if (keys.add(entry.getKey())) {
                newChanges.addFirst(entry);
                totalChanges.incrementAndGet();
            }
        }
        LOG.warn("readded {} keys", totalChanges.get());
    }

        @Override
        public void close() {
            cache.lock.readLock().unlock();
            if (cache.evictionNeeded() && cache.eviction.compareAndSet(false, 
true)) {
                try {
                    do {
                        LOG.warn("closing operationcontext as {}", 
cache.evictionNeeded());
                        cache.compressChangesIfNeeded();
                        LOG.warn("cache is full? {}", cache.isCacheFull());
                        if (cache.isCacheFull()) {
                            Entry<K, ValueHolder<V>> oldest = 
cache.nextOldestChange();
                            if (cache.delegate.remove(oldest.getKey(), 
oldest.getValue())) {
                                cache.evict.accept(oldest.getValue().get());
                            }
                        } else {
                            LOG.warn("cache is not full! eviction? {}", 
cache.evictionNeeded());
                        }
                    } while (cache.evictionNeeded());
                    LOG.warn("cache eviction completed");
                } finally {
                    cache.eviction.set(false);
                }
            }
        }
{noformat}

I then started my service - which has a from {{file}} and a to {{sftp}}.
{noformat}
|From 
file:///home/my/data?bridgeErrorHandler=true&delay=60000&idempotent=true&idempotentKey=${file:absolute.path}-${file:modified}&idempotentRepository=#MongoDbIdempotentRepository&inProgressRepository=#InProgressRepository&noop=true&readLock=changed&readLockCheckInterval=2000ms&readLockMinAge=1000ms&readLockTimeout=4000ms&recursive=true
| To sftp://user@1.2.3.4:22/path/to/my/data
{noformat}

I then produced 10k files like this:

{noformat}
for i in {1..10000}; do   NUM=$(od -An -N8 -tu8 < /dev/urandom | tr -d ' ');   
echo "$NUM" > "$NUM"; done
{noformat}

Now each file is transferred by the route, see my service log:

{noformat}
2025-06-18T14:55:15.411+0200
# 21483 times the operation context did cache eviction, i.e. "cache eviction 
completed"
$ cat log_01.log  | grep -i closing | wc -l
21483

# only 3 files have been archived until then at 2025-06-18T14:56:13.356+0200 
(i.e. a minute later)
$ cat log_01.log  | grep -i "archived file" | wc -l
3

# another 22479 have been archived at 2025-06-18T15:01:43.344+0200
$ cat log_00.log  | grep -i "archived file" | wc -l
22479

# nothing until  2025-06-18T15:02:57.144+0200 (over a minute)
evicitionNeeded as cache is full 1001 > 1000 (queue size: 2001 > 2000)

...

# this time everything succeeded eventually, yet the cache eviction was called 
28008 times and is producing quite high cpu usage.
{noformat}


was (Author: adoser):
Unfortunately I do not yet have a minimal example, but maybe some more insights.

I adjusted - based on current main - the implementation like follows:

{noformat}

    private boolean evictionNeeded() {
        if (isCacheFull()) {
            LOG.warn("evicitionNeeded as cache is full {} > {} (queue size: {} 
> {})", size(),
                    maximumCacheSize, getQueueSize(), Math.max(2 * 
maximumCacheSize, MINIMUM_QUEUE_SIZE));
            return true;
        }

        if (isQueueFull()) {
            LOG.warn("evicitionNeeded as queue is full {} > {}", getQueueSize(),
                    Math.max(2 * maximumCacheSize, MINIMUM_QUEUE_SIZE));
            return true;
        }

        return false;
    }

    /**
     * Removes duplicates from the queue of changes if the queue is full.
     */
    private void compressChangesIfNeeded() {
        // noop
        Deque<Entry<K, ValueHolder<V>>> newChanges;
        Deque<Entry<K, ValueHolder<V>>> currentChanges;
        try {
            long start = System.currentTimeMillis();
            LOG.warn("cCiN lock: acquiring");
            lock.writeLock().lock();
            LOG.warn("cCiN lock: acquired (after {} ms)", 
(System.currentTimeMillis() - start));
            if (isQueueFull()) {
                newChanges = new ConcurrentLinkedDeque<>();
                totalChanges.set(0);
                currentChanges = lastChanges.getAndSet(newChanges);
            } else {
                return;
            }
        } finally {
            long start = System.currentTimeMillis();
            LOG.warn("cCiN lock: releasing");
            lock.writeLock().unlock();
            LOG.warn("cCiN lock: released (after {} ms)", 
(System.currentTimeMillis() - start));
        }
        Set<K> keys = new HashSet<>(Math.max(2 * maximumCacheSize, 
MINIMUM_QUEUE_SIZE));
        Entry<K, ValueHolder<V>> entry;
        while ((entry = currentChanges.pollLast()) != null) {
            LOG.debug("readding key {}", entry.getKey());
            if (keys.add(entry.getKey())) {
                newChanges.addFirst(entry);
                totalChanges.incrementAndGet();
            }
        }
        LOG.warn("readded {} keys", totalChanges.get());
    }

        @Override
        public void close() {
            cache.lock.readLock().unlock();
            if (cache.evictionNeeded() && cache.eviction.compareAndSet(false, 
true)) {
                try {
                    do {
                        LOG.warn("closing operationcontext as {}", 
cache.evictionNeeded());
                        cache.compressChangesIfNeeded();
                        LOG.warn("cache is full? {}", cache.isCacheFull());
                        if (cache.isCacheFull()) {
                            Entry<K, ValueHolder<V>> oldest = 
cache.nextOldestChange();
                            if (cache.delegate.remove(oldest.getKey(), 
oldest.getValue())) {
                                cache.evict.accept(oldest.getValue().get());
                            }
                        } else {
                            LOG.warn("cache is not full! eviction? {}", 
cache.evictionNeeded());
                        }
                    } while (cache.evictionNeeded());
                    LOG.warn("cache eviction completed");
                } finally {
                    cache.eviction.set(false);
                }
            }
        }
{noformat}

I then started my service - which has a from {{file}} and a to {{sftp}}.
{noformat}
|From 
file:///home/my/data?bridgeErrorHandler=true&delay=60000&idempotent=true&idempotentKey=${file:absolute.path}-${file:modified}&idempotentRepository=#MongoDbIdempotentRepository&inProgressRepository=#InProgressRepository&noop=true&readLock=changed&readLockCheckInterval=2000ms&readLockMinAge=1000ms&readLockTimeout=4000ms&recursive=true
| To sftp://user@1.2.3.4:22/path/to/my/data
{noformat}

I then produced 10k files like this:

{noformat}
for i in {1..10000}; do   NUM=$(od -An -N8 -tu8 < /dev/urandom | tr -d ' ');   
echo "$NUM" > "$NUM"; done
{noformat}

Now each file is transferred by the route, see my service log:

{noformat}
2025-06-18T14:55:15.411+0200
# 21483 times the operation context did cache eviction, i.e. "cache eviction 
completed"
$ cat log_01.log  | grep -i closing | wc -l
21483

# only 3 files have been archived until then at 2025-06-18T14:56:13.356+0200 
(i.e. a second later)
$ cat log_01.log  | grep -i "archived file" | wc -l
3

# another 22479 have been archived at 2025-06-18T15:01:43.344+0200
$ cat log_00.log  | grep -i "archived file" | wc -l
22479

# nothing until  2025-06-18T15:02:57.144+0200 (over a minute)
evicitionNeeded as cache is full 1001 > 1000 (queue size: 2001 > 2000)

...

# this time everything succeeded eventually, yet the cache eviction was called 
28008 times and is producing quite high cpu usage.
{noformat}

> High CPU Usage and Deadlock/Race Condition (?) in SimpleLRUCache after 
> Upgrade (>4.8.2)
> ---------------------------------------------------------------------------------------
>
>                 Key: CAMEL-22176
>                 URL: https://issues.apache.org/jira/browse/CAMEL-22176
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-core
>    Affects Versions: 4.8.3
>            Reporter: Andre
>            Priority: Major
>
> *Summary:*
> After upgrading Apache Camel to a version greater than 4.8.2, we are 
> experiencing significant CPU usage spikes, potential deadlocks, or race 
> conditions. The issue appears to be related to the changes introduced in 
> CAMEL-21888, specifically affecting the {{compressChangesIfNeeded()}} method 
> in {{{}SimpleLRUCache{}}}.
> *Environment:*
>  * Apache Camel version: 4.8.x (issue persists in 4.12.0)
>  * Previous working version: 4.8.2
>  * Large file processing: thousands of files from a file input to SFTP
>  * Cache size: 2,000
>  * Files processed per run: ~1,0000
> *Description:*
> Following the upgrade, we observed that when processing large numbers of 
> files (e.g., 1,000 files per run), the {{lastChanges}} queue in 
> {{SimpleLRUCache}} fills up rapidly. For each file, the {{OperationContext}} 
> is invoked, which clears and empties the queue. Since our workload rarely has 
> duplicates, this results in a high number of iterations—potentially 50 
> million for 1,000 files and a 50,000 cache size.
> This behavior leads to:
>  * *High CPU usage* (sometimes reaching 100%)
>  * *Application startup delays or failures*
>  * *Potential deadlocks or race conditions* due to excessive locking or queue 
> operations
> This appears to be a regression caused by the change in 
> {{{}SimpleLRUCache.{}}}.
> *Steps to Reproduce:*
>  # Upgrade Camel to a version >4.8.2.
>  # Configure a route that processes thousands of files (e.g., from a 
> directory to an SFTP endpoint).
>  # Set the cache size to a high value (e.g., 50,000).
>  # Observe CPU usage and application responsiveness during processing.
> *Expected Behavior:*
> Efficient cache management without excessive CPU usage or risk of deadlocks, 
> even with large numbers of files and a high cache size.
> *Actual Behavior:*
>  * CPU usage spikes to 100% during processing.
>  * Application may fail to start or becomes unresponsive.
>  * Large number of iterations in {{compressChangesIfNeeded()}} due to the way 
> {{lastChanges}} is managed.
> *Suspected Root Cause:*
> The change to use a {{Deque}} for {{lastChanges}} and the logic in 
> {{compressChangesIfNeeded()}} is not efficient for high-throughput scenarios 
> with large caches and frequent operations, especially when there are few or 
> no duplicates
>  
> *Notes*
>  * We noticed regression in two independent services after upgrading camel - 
> both with high indication that the LRUCache is the problem.
>  * -After changing {{compressChangesIfNeeded()}} to {{noop}} the CPU usage 
> and deadlock/race condition issues are gone-
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to