This is an automated email from the ASF dual-hosted git repository.

nfilotto pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 32895ccfeae CAMEL-21614: camel-core - Prevent cache change miss on 
queue swap (#16809)
32895ccfeae is described below

commit 32895ccfeae995497a275754dafcad38947447b7
Author: Nicolas Filotto <[email protected]>
AuthorDate: Wed Jan 15 09:16:33 2025 +0100

    CAMEL-21614: camel-core - Prevent cache change miss on queue swap (#16809)
    
    ## Motivation
    
    In some specific use cases, the eviction of the entries never ends
    
    ## Modifications:
    
    * Improve the way to swap the queue of change to avoid the race condition 
that could cause cache changes miss by leveraging a `ReadWriteLock` to only 
prevent change additions during the swap
---
 .../apache/camel/support/cache/SimpleLRUCache.java | 22 ++++++++++++++++++++--
 1 file changed, 20 insertions(+), 2 deletions(-)

diff --git 
a/core/camel-support/src/main/java/org/apache/camel/support/cache/SimpleLRUCache.java
 
b/core/camel-support/src/main/java/org/apache/camel/support/cache/SimpleLRUCache.java
index b6996a3f215..7c1b76e601a 100644
--- 
a/core/camel-support/src/main/java/org/apache/camel/support/cache/SimpleLRUCache.java
+++ 
b/core/camel-support/src/main/java/org/apache/camel/support/cache/SimpleLRUCache.java
@@ -26,6 +26,8 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.BiFunction;
 import java.util.function.Consumer;
 import java.util.function.Function;
@@ -49,6 +51,10 @@ public class SimpleLRUCache<K, V> extends 
ConcurrentHashMap<K, V> {
      * The flag indicating that an eviction process is in progress.
      */
     private final AtomicBoolean eviction = new AtomicBoolean();
+    /**
+     * The lock to prevent the addition of changes during the swap of queue of 
changes.
+     */
+    private final ReadWriteLock swapLock = new ReentrantReadWriteLock();
     /**
      * The maximum cache size.
      */
@@ -84,7 +90,13 @@ public class SimpleLRUCache<K, V> extends 
ConcurrentHashMap<K, V> {
         if (value == null) {
             return null;
         }
-        lastChanges.get().add(Map.entry(key, value));
+        Entry<K, V> entry = Map.entry(key, value);
+        swapLock.readLock().lock();
+        try {
+            lastChanges.get().add(entry);
+        } finally {
+            swapLock.readLock().unlock();
+        }
         return value;
     }
 
@@ -269,7 +281,13 @@ public class SimpleLRUCache<K, V> extends 
ConcurrentHashMap<K, V> {
      */
     private void compressChanges() {
         Deque<Entry<K, V>> newChanges = new ConcurrentLinkedDeque<>();
-        Deque<Entry<K, V>> currentChanges = lastChanges.getAndSet(newChanges);
+        Deque<Entry<K, V>> currentChanges;
+        swapLock.writeLock().lock();
+        try {
+            currentChanges = lastChanges.getAndSet(newChanges);
+        } finally {
+            swapLock.writeLock().unlock();
+        }
         Set<K> keys = new HashSet<>();
         Entry<K, V> entry;
         while ((entry = currentChanges.pollLast()) != null) {

Reply via email to