This is an automated email from the ASF dual-hosted git repository.
nfilotto pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 32895ccfeae CAMEL-21614: camel-core - Prevent cache change miss on
queue swap (#16809)
32895ccfeae is described below
commit 32895ccfeae995497a275754dafcad38947447b7
Author: Nicolas Filotto <[email protected]>
AuthorDate: Wed Jan 15 09:16:33 2025 +0100
CAMEL-21614: camel-core - Prevent cache change miss on queue swap (#16809)
## Motivation
In some specific use cases, the eviction of the entries never ends
## Modifications:
* Improve the way to swap the queue of change to avoid the race condition
that could cause cache changes miss by leveraging a `ReadWriteLock` to only
prevent change additions during the swap
---
.../apache/camel/support/cache/SimpleLRUCache.java | 22 ++++++++++++++++++++--
1 file changed, 20 insertions(+), 2 deletions(-)
diff --git
a/core/camel-support/src/main/java/org/apache/camel/support/cache/SimpleLRUCache.java
b/core/camel-support/src/main/java/org/apache/camel/support/cache/SimpleLRUCache.java
index b6996a3f215..7c1b76e601a 100644
---
a/core/camel-support/src/main/java/org/apache/camel/support/cache/SimpleLRUCache.java
+++
b/core/camel-support/src/main/java/org/apache/camel/support/cache/SimpleLRUCache.java
@@ -26,6 +26,8 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
@@ -49,6 +51,10 @@ public class SimpleLRUCache<K, V> extends
ConcurrentHashMap<K, V> {
* The flag indicating that an eviction process is in progress.
*/
private final AtomicBoolean eviction = new AtomicBoolean();
+ /**
+ * The lock to prevent the addition of changes during the swap of queue of
changes.
+ */
+ private final ReadWriteLock swapLock = new ReentrantReadWriteLock();
/**
* The maximum cache size.
*/
@@ -84,7 +90,13 @@ public class SimpleLRUCache<K, V> extends
ConcurrentHashMap<K, V> {
if (value == null) {
return null;
}
- lastChanges.get().add(Map.entry(key, value));
+ Entry<K, V> entry = Map.entry(key, value);
+ swapLock.readLock().lock();
+ try {
+ lastChanges.get().add(entry);
+ } finally {
+ swapLock.readLock().unlock();
+ }
return value;
}
@@ -269,7 +281,13 @@ public class SimpleLRUCache<K, V> extends
ConcurrentHashMap<K, V> {
*/
private void compressChanges() {
Deque<Entry<K, V>> newChanges = new ConcurrentLinkedDeque<>();
- Deque<Entry<K, V>> currentChanges = lastChanges.getAndSet(newChanges);
+ Deque<Entry<K, V>> currentChanges;
+ swapLock.writeLock().lock();
+ try {
+ currentChanges = lastChanges.getAndSet(newChanges);
+ } finally {
+ swapLock.writeLock().unlock();
+ }
Set<K> keys = new HashSet<>();
Entry<K, V> entry;
while ((entry = currentChanges.pollLast()) != null) {