This is an automated email from the ASF dual-hosted git repository.

nfilotto pushed a commit to branch essobedo/CAMEL-22176/blocking-eviction-4.10
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 51ad76c626f0b04f3191091e7c5de14ca400a024
Author: Nicolas Filotto <[email protected]>
AuthorDate: Wed Jun 18 13:19:50 2025 +0200

    CAMEL-22176: camel-core - make the cache eviction blocking
---
 .../apache/camel/support/cache/SimpleLRUCache.java | 89 ++++++++++++----------
 1 file changed, 49 insertions(+), 40 deletions(-)

diff --git 
a/core/camel-support/src/main/java/org/apache/camel/support/cache/SimpleLRUCache.java
 
b/core/camel-support/src/main/java/org/apache/camel/support/cache/SimpleLRUCache.java
index 5c3e79be57f..29f1884a9e4 100644
--- 
a/core/camel-support/src/main/java/org/apache/camel/support/cache/SimpleLRUCache.java
+++ 
b/core/camel-support/src/main/java/org/apache/camel/support/cache/SimpleLRUCache.java
@@ -36,8 +36,8 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 
 /**
- * {@code SimpleLRUCache} is a simple implementation of a cache of type Least 
Recently Used . The implementation doesn't
- * accept null values. Generally speaking, the parameters of all the public 
methods must have a value otherwise a
+ * {@code SimpleLRUCache} is a simple implementation of a Least Recently Used 
cache. The implementation doesn't accept
+ * null values. Generally speaking, the parameters of all the public methods 
must have a value otherwise a
  * {@code NullPointerException} is thrown.
  *
  * @param <K> type of the key
@@ -47,7 +47,7 @@ public class SimpleLRUCache<K, V> implements Map<K, V> {
 
     static final float DEFAULT_LOAD_FACTOR = 0.75f;
     /**
-     * The minimum size of the queue of changes.
+     * The minimum size of the changes.
      */
     static final int MINIMUM_QUEUE_SIZE = 128;
     /**
@@ -55,7 +55,7 @@ public class SimpleLRUCache<K, V> implements Map<K, V> {
      */
     private final AtomicBoolean eviction = new AtomicBoolean();
     /**
-     * The lock to prevent the addition of changes during the swap of queue of 
changes or the cache cleaning.
+     * The lock to prevent the addition of changes during the swap of changes 
or the cache cleaning.
      */
     private final ReadWriteLock lock = new ReentrantReadWriteLock();
     /**
@@ -68,7 +68,7 @@ public class SimpleLRUCache<K, V> implements Map<K, V> {
     private final AtomicReference<Deque<Entry<K, ValueHolder<V>>>> lastChanges
             = new AtomicReference<>(new ConcurrentLinkedDeque<>());
     /**
-     * The total amount of changes recorded.
+     * The total number of changes recorded.
      */
     private final AtomicInteger totalChanges = new AtomicInteger();
     /**
@@ -339,7 +339,7 @@ public class SimpleLRUCache<K, V> implements Map<K, V> {
     }
 
     /**
-     * @return the size of the queue of changes.
+     * @return the size of the changes.
      */
     int getQueueSize() {
         return totalChanges.get();
@@ -364,8 +364,8 @@ public class SimpleLRUCache<K, V> implements Map<K, V> {
     }
 
     /**
-     * Indicates whether the size of the queue of changes exceeds the maximum 
allowed size which is the max value
-     * between {@link #MINIMUM_QUEUE_SIZE} and {@code 2 * maximumCacheSize}.
+     * Indicates whether the size of the changes exceeds the maximum allowed 
size which is the max value between
+     * {@link #MINIMUM_QUEUE_SIZE} and {@code 2 * maximumCacheSize}.
      *
      * @return {@code true} if the queue is full, {@code false} otherwise.
      */
@@ -386,28 +386,51 @@ public class SimpleLRUCache<K, V> implements Map<K, V> {
      * Removes duplicates from the queue of changes if the queue is full.
      */
     private void compressChangesIfNeeded() {
-        Deque<Entry<K, ValueHolder<V>>> newChanges;
-        Deque<Entry<K, ValueHolder<V>>> currentChanges;
+        if (isQueueFull()) {
+            Deque<Entry<K, ValueHolder<V>>> newChanges = new 
ConcurrentLinkedDeque<>();
+            Deque<Entry<K, ValueHolder<V>>> currentChanges = 
lastChanges.getAndSet(newChanges);
+            Set<K> keys = new HashSet<>();
+            Entry<K, ValueHolder<V>> entry;
+            while ((entry = currentChanges.pollLast()) != null) {
+                if (keys.add(entry.getKey())) {
+                    newChanges.addFirst(entry);
+                }
+            }
+            totalChanges.set(keys.size());
+        }
+    }
+
+    /**
+     * Checks whether an eviction is needed and no eviction process is yet in 
progress, and if so, it calls the eviction
+     * process.
+     */
+    private void callEvictionIfNeeded() {
+        if (evictionNeeded() && eviction.compareAndSet(false, true)) {
+            try {
+                callEviction();
+            } finally {
+                eviction.set(false);
+            }
+        }
+    }
+
+    /**
+     * Evicts the oldest entries from the cache until the cache is not full 
anymore. This process is blocking to ensure
+     * that no changes are added to the cache while the eviction is in 
progress, to prevent infinite eviction.
+     */
+    private void callEviction() {
         lock.writeLock().lock();
         try {
-            if (isQueueFull()) {
-                newChanges = new ConcurrentLinkedDeque<>();
-                totalChanges.set(0);
-                currentChanges = lastChanges.getAndSet(newChanges);
-            } else {
-                return;
+            compressChangesIfNeeded();
+            while (isCacheFull()) {
+                Entry<K, ValueHolder<V>> oldest = nextOldestChange();
+                if (delegate.remove(oldest.getKey(), oldest.getValue())) {
+                    evict.accept(oldest.getValue().get());
+                }
             }
         } finally {
             lock.writeLock().unlock();
         }
-        Set<K> keys = new HashSet<>();
-        Entry<K, ValueHolder<V>> entry;
-        while ((entry = currentChanges.pollLast()) != null) {
-            if (keys.add(entry.getKey())) {
-                newChanges.addFirst(entry);
-                totalChanges.incrementAndGet();
-            }
-        }
     }
 
     private ValueHolder<V> newValue(V value) {
@@ -419,7 +442,7 @@ public class SimpleLRUCache<K, V> implements Map<K, V> {
     }
 
     /**
-     * The internal context of all write operations.
+     * The internal context of all writes operations.
      */
     private static class OperationContext<K, V> implements AutoCloseable {
         /**
@@ -444,21 +467,7 @@ public class SimpleLRUCache<K, V> implements Map<K, V> {
         @Override
         public void close() {
             cache.lock.readLock().unlock();
-            if (cache.evictionNeeded() && cache.eviction.compareAndSet(false, 
true)) {
-                try {
-                    do {
-                        cache.compressChangesIfNeeded();
-                        if (cache.isCacheFull()) {
-                            Entry<K, ValueHolder<V>> oldest = 
cache.nextOldestChange();
-                            if (cache.delegate.remove(oldest.getKey(), 
oldest.getValue())) {
-                                cache.evict.accept(oldest.getValue().get());
-                            }
-                        }
-                    } while (cache.evictionNeeded());
-                } finally {
-                    cache.eviction.set(false);
-                }
-            }
+            cache.callEvictionIfNeeded();
         }
     }
 

Reply via email to