This is an automated email from the ASF dual-hosted git repository. nfilotto pushed a commit to branch essobedo/CAMEL-22176/blocking-eviction-4.10 in repository https://gitbox.apache.org/repos/asf/camel.git
commit 51ad76c626f0b04f3191091e7c5de14ca400a024 Author: Nicolas Filotto <[email protected]> AuthorDate: Wed Jun 18 13:19:50 2025 +0200 CAMEL-22176: camel-core - make the cache eviction blocking --- .../apache/camel/support/cache/SimpleLRUCache.java | 89 ++++++++++++---------- 1 file changed, 49 insertions(+), 40 deletions(-) diff --git a/core/camel-support/src/main/java/org/apache/camel/support/cache/SimpleLRUCache.java b/core/camel-support/src/main/java/org/apache/camel/support/cache/SimpleLRUCache.java index 5c3e79be57f..29f1884a9e4 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/cache/SimpleLRUCache.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/cache/SimpleLRUCache.java @@ -36,8 +36,8 @@ import java.util.function.Function; import java.util.stream.Collectors; /** - * {@code SimpleLRUCache} is a simple implementation of a cache of type Least Recently Used . The implementation doesn't - * accept null values. Generally speaking, the parameters of all the public methods must have a value otherwise a + * {@code SimpleLRUCache} is a simple implementation of a Least Recently Used cache. The implementation doesn't accept + * null values. Generally speaking, the parameters of all the public methods must have a value otherwise a * {@code NullPointerException} is thrown. * * @param <K> type of the key @@ -47,7 +47,7 @@ public class SimpleLRUCache<K, V> implements Map<K, V> { static final float DEFAULT_LOAD_FACTOR = 0.75f; /** - * The minimum size of the queue of changes. + * The minimum size of the changes. */ static final int MINIMUM_QUEUE_SIZE = 128; /** @@ -55,7 +55,7 @@ public class SimpleLRUCache<K, V> implements Map<K, V> { */ private final AtomicBoolean eviction = new AtomicBoolean(); /** - * The lock to prevent the addition of changes during the swap of queue of changes or the cache cleaning. + * The lock to prevent the addition of changes during the swap of changes or the cache cleaning. */ private final ReadWriteLock lock = new ReentrantReadWriteLock(); /** @@ -68,7 +68,7 @@ public class SimpleLRUCache<K, V> implements Map<K, V> { private final AtomicReference<Deque<Entry<K, ValueHolder<V>>>> lastChanges = new AtomicReference<>(new ConcurrentLinkedDeque<>()); /** - * The total amount of changes recorded. + * The total number of changes recorded. */ private final AtomicInteger totalChanges = new AtomicInteger(); /** @@ -339,7 +339,7 @@ public class SimpleLRUCache<K, V> implements Map<K, V> { } /** - * @return the size of the queue of changes. + * @return the size of the changes. */ int getQueueSize() { return totalChanges.get(); @@ -364,8 +364,8 @@ public class SimpleLRUCache<K, V> implements Map<K, V> { } /** - * Indicates whether the size of the queue of changes exceeds the maximum allowed size which is the max value - * between {@link #MINIMUM_QUEUE_SIZE} and {@code 2 * maximumCacheSize}. + * Indicates whether the size of the changes exceeds the maximum allowed size which is the max value between + * {@link #MINIMUM_QUEUE_SIZE} and {@code 2 * maximumCacheSize}. * * @return {@code true} if the queue is full, {@code false} otherwise. */ @@ -386,28 +386,51 @@ public class SimpleLRUCache<K, V> implements Map<K, V> { * Removes duplicates from the queue of changes if the queue is full. */ private void compressChangesIfNeeded() { - Deque<Entry<K, ValueHolder<V>>> newChanges; - Deque<Entry<K, ValueHolder<V>>> currentChanges; + if (isQueueFull()) { + Deque<Entry<K, ValueHolder<V>>> newChanges = new ConcurrentLinkedDeque<>(); + Deque<Entry<K, ValueHolder<V>>> currentChanges = lastChanges.getAndSet(newChanges); + Set<K> keys = new HashSet<>(); + Entry<K, ValueHolder<V>> entry; + while ((entry = currentChanges.pollLast()) != null) { + if (keys.add(entry.getKey())) { + newChanges.addFirst(entry); + } + } + totalChanges.set(keys.size()); + } + } + + /** + * Checks whether an eviction is needed and no eviction process is yet in progress, and if so, it calls the eviction + * process. + */ + private void callEvictionIfNeeded() { + if (evictionNeeded() && eviction.compareAndSet(false, true)) { + try { + callEviction(); + } finally { + eviction.set(false); + } + } + } + + /** + * Evicts the oldest entries from the cache until the cache is not full anymore. This process is blocking to ensure + * that no changes are added to the cache while the eviction is in progress, to prevent infinite eviction. + */ + private void callEviction() { lock.writeLock().lock(); try { - if (isQueueFull()) { - newChanges = new ConcurrentLinkedDeque<>(); - totalChanges.set(0); - currentChanges = lastChanges.getAndSet(newChanges); - } else { - return; + compressChangesIfNeeded(); + while (isCacheFull()) { + Entry<K, ValueHolder<V>> oldest = nextOldestChange(); + if (delegate.remove(oldest.getKey(), oldest.getValue())) { + evict.accept(oldest.getValue().get()); + } } } finally { lock.writeLock().unlock(); } - Set<K> keys = new HashSet<>(); - Entry<K, ValueHolder<V>> entry; - while ((entry = currentChanges.pollLast()) != null) { - if (keys.add(entry.getKey())) { - newChanges.addFirst(entry); - totalChanges.incrementAndGet(); - } - } } private ValueHolder<V> newValue(V value) { @@ -419,7 +442,7 @@ public class SimpleLRUCache<K, V> implements Map<K, V> { } /** - * The internal context of all write operations. + * The internal context of all writes operations. */ private static class OperationContext<K, V> implements AutoCloseable { /** @@ -444,21 +467,7 @@ public class SimpleLRUCache<K, V> implements Map<K, V> { @Override public void close() { cache.lock.readLock().unlock(); - if (cache.evictionNeeded() && cache.eviction.compareAndSet(false, true)) { - try { - do { - cache.compressChangesIfNeeded(); - if (cache.isCacheFull()) { - Entry<K, ValueHolder<V>> oldest = cache.nextOldestChange(); - if (cache.delegate.remove(oldest.getKey(), oldest.getValue())) { - cache.evict.accept(oldest.getValue().get()); - } - } - } while (cache.evictionNeeded()); - } finally { - cache.eviction.set(false); - } - } + cache.callEvictionIfNeeded(); } }
