This is an automated email from the ASF dual-hosted git repository.
nfilotto pushed a commit to branch camel-4.4.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-4.4.x by this push:
new 1b3d20ff0d5 CAMEL-20850: camel-core - compress changes in
SimpleLRUCache (#14482)
1b3d20ff0d5 is described below
commit 1b3d20ff0d55d438040c9e2c084b4ca3f571ee23
Author: Nicolas Filotto <[email protected]>
AuthorDate: Tue Jun 11 19:16:48 2024 +0200
CAMEL-20850: camel-core - compress changes in SimpleLRUCache (#14482)
## Motivation
When the queue of changes is full, some entries can be evicted while the
cache is not, which is not expected.
## Modifications:
* Compress the changes by removing duplicates before the eviction
* Add a minimum size for the queue of changes
---
.../camel/support/cache/SimpleLRUCacheTest.java | 131 +++++++++++++++++----
.../apache/camel/support/cache/SimpleLRUCache.java | 69 +++++++++--
2 files changed, 168 insertions(+), 32 deletions(-)
diff --git
a/core/camel-core/src/test/java/org/apache/camel/support/cache/SimpleLRUCacheTest.java
b/core/camel-core/src/test/java/org/apache/camel/support/cache/SimpleLRUCacheTest.java
index d7e9f4549a5..c6464c71d09 100644
---
a/core/camel-core/src/test/java/org/apache/camel/support/cache/SimpleLRUCacheTest.java
+++
b/core/camel-core/src/test/java/org/apache/camel/support/cache/SimpleLRUCacheTest.java
@@ -19,11 +19,17 @@ package org.apache.camel.support.cache;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import static org.apache.camel.support.cache.SimpleLRUCache.MINIMUM_QUEUE_SIZE;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -56,30 +62,17 @@ class SimpleLRUCacheTest {
@Test
void queueSize() {
assertEquals(0, map.getQueueSize());
- map.put("1", "1");
+ for (int i = 1; i <= MINIMUM_QUEUE_SIZE; i++) {
+ map.put("1", Integer.toString(i));
+ assertEquals(1, map.size());
+ assertEquals(i, map.getQueueSize());
+ }
+ map.put("1", "A");
assertEquals(1, map.size());
assertEquals(1, map.getQueueSize());
- map.put("1", "2");
+ map.put("1", "B");
assertEquals(1, map.size());
assertEquals(2, map.getQueueSize());
- map.put("1", "3");
- assertEquals(1, map.size());
- assertEquals(3, map.getQueueSize());
- map.put("1", "4");
- assertEquals(1, map.size());
- assertEquals(4, map.getQueueSize());
- map.put("1", "5");
- assertEquals(1, map.size());
- assertEquals(5, map.getQueueSize());
- map.put("1", "6");
- assertEquals(1, map.size());
- assertEquals(6, map.getQueueSize());
- map.put("1", "7");
- assertEquals(1, map.size());
- assertEquals(6, map.getQueueSize());
- map.put("1", "8");
- assertEquals(1, map.size());
- assertEquals(6, map.getQueueSize());
}
@Test
@@ -287,4 +280,102 @@ class SimpleLRUCacheTest {
assertEquals(3, map.size());
assertEquals(0, consumed.size());
}
+
+ @Test
+ void ignoreDuplicates() {
+ assertEquals(0, map.size());
+ for (int i = 0; i < 100; i++) {
+ map.put("1", Integer.toString(i));
+ assertEquals(1, map.size(), String.format("The expected size is 1
but it fails after %d puts", i + 1));
+ }
+ assertEquals("99", map.get("1"));
+ assertNull(map.put("2", "Two"));
+ assertEquals(2, map.size());
+ assertEquals("99", map.get("1"));
+ assertNull(map.put("3", "Three"));
+ assertEquals(3, map.size());
+ assertEquals(0, consumed.size());
+ assertEquals("99", map.get("1"));
+ assertNull(map.put("4", "Four"));
+ assertEquals(3, map.size());
+ assertEquals(1, consumed.size());
+ assertFalse(map.containsKey("1"));
+ assertTrue(consumed.contains("99"));
+ }
+
+ @Test
+ void ensureEvictionOrdering() {
+ assertEquals(0, map.size());
+ assertNull(map.put("1", "One"));
+ assertNotNull(map.put("1", "One"));
+ assertNotNull(map.put("1", "One"));
+ assertNotNull(map.put("1", "One"));
+ assertNotNull(map.put("1", "One"));
+ assertNotNull(map.put("1", "One"));
+ assertNull(map.put("2", "Two"));
+ assertNotNull(map.put("1", "One"));
+ assertNull(map.put("3", "Three"));
+ assertEquals(3, map.size());
+ assertNull(map.put("4", "Four"));
+ assertEquals(3, map.size());
+ assertEquals(1, consumed.size());
+ assertFalse(map.containsKey("2"));
+ assertTrue(consumed.contains("Two"));
+ }
+
+ @ParameterizedTest
+ @ValueSource(ints = { 1, 2, 5, 10, 20, 50, 100, 1_000 })
+ void concurrentPut(int maximumCacheSize) throws Exception {
+ int threads = Runtime.getRuntime().availableProcessors();
+ int totalKeysPerThread = 1_000;
+ AtomicInteger counter = new AtomicInteger();
+ SimpleLRUCache<String, String> cache = new SimpleLRUCache<>(16,
maximumCacheSize, v -> counter.incrementAndGet());
+ CountDownLatch latch = new CountDownLatch(threads);
+ for (int i = 0; i < threads; i++) {
+ int threadId = i;
+ new Thread(() -> {
+ try {
+ for (int j = 0; j < totalKeysPerThread; j++) {
+ cache.put(threadId + "-" + j, Integer.toString(j));
+ }
+ } finally {
+ latch.countDown();
+ }
+ }).start();
+ }
+ latch.await();
+ assertEquals(maximumCacheSize, cache.size());
+ assertEquals(totalKeysPerThread * threads - maximumCacheSize,
counter.get());
+ }
+
+ @ParameterizedTest
+ @ValueSource(ints = { 1, 2, 5, 10, 20, 50, 100, 500 })
+ void concurrentPutWithCollisions(int maximumCacheSize) throws Exception {
+ int threads = Runtime.getRuntime().availableProcessors();
+ int totalKeys = 1_000;
+ AtomicInteger counter = new AtomicInteger();
+ SimpleLRUCache<String, String> cache = new SimpleLRUCache<>(16,
maximumCacheSize, v -> counter.incrementAndGet());
+ CountDownLatch latch = new CountDownLatch(threads);
+ for (int i = 0; i < threads; i++) {
+ new Thread(() -> {
+ try {
+ for (int j = 0; j < totalKeys; j++) {
+ cache.put(Integer.toString(j), Integer.toString(j));
+ }
+ } finally {
+ latch.countDown();
+ }
+ }).start();
+ }
+ latch.await();
+ assertEquals(maximumCacheSize, cache.size());
+ counter.set(0);
+ for (int j = 0; j < maximumCacheSize; j++) {
+ cache.put(Integer.toString(j), "OK");
+ }
+ assertEquals(maximumCacheSize, counter.get());
+ for (int j = 0; j < maximumCacheSize; j++) {
+ assertEquals("OK", cache.get(Integer.toString(j)));
+ }
+ }
}
diff --git
a/core/camel-support/src/main/java/org/apache/camel/support/cache/SimpleLRUCache.java
b/core/camel-support/src/main/java/org/apache/camel/support/cache/SimpleLRUCache.java
index c36c1640259..c1fcb77d41b 100644
---
a/core/camel-support/src/main/java/org/apache/camel/support/cache/SimpleLRUCache.java
+++
b/core/camel-support/src/main/java/org/apache/camel/support/cache/SimpleLRUCache.java
@@ -17,12 +17,13 @@
package org.apache.camel.support.cache;
import java.util.Collections;
+import java.util.Deque;
+import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
-import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.BiFunction;
@@ -40,6 +41,10 @@ import java.util.function.Function;
public class SimpleLRUCache<K, V> extends ConcurrentHashMap<K, V> {
static final float DEFAULT_LOAD_FACTOR = 0.75f;
+ /**
+ * The minimum size of the queue of changes.
+ */
+ static final int MINIMUM_QUEUE_SIZE = 128;
/**
* The flag indicating that an eviction process is in progress.
*/
@@ -51,7 +56,7 @@ public class SimpleLRUCache<K, V> extends
ConcurrentHashMap<K, V> {
/**
* The last changes recorded.
*/
- private final Queue<Entry<K, V>> lastChanges = new
ConcurrentLinkedQueue<>();
+ private volatile Deque<Entry<K, V>> lastChanges = new
ConcurrentLinkedDeque<>();
/**
* The total amount of changes recorded.
*/
@@ -227,14 +232,31 @@ public class SimpleLRUCache<K, V> extends
ConcurrentHashMap<K, V> {
}
/**
- * Indicates whether an eviction is needed. An eviction can be triggered
if the size of the map or the queue of
- * changes exceeds the maximum allowed size which is respectively {@code
maximumCacheSize} and
- * {@code 2 * maximumCacheSize}.
+ * Indicates whether an eviction is needed. An eviction can be triggered
if either the cache or the queue is full.
*
* @return {@code true} if an eviction is needed, {@code false} otherwise.
*/
private boolean evictionNeeded() {
- return size() > maximumCacheSize || getQueueSize() > 2 *
maximumCacheSize;
+ return isCacheFull() || isQueueFull();
+ }
+
+ /**
+ * Indicates whether the size of the map exceeds the maximum allowed size
which is {@code maximumCacheSize}.
+ *
+ * @return {@code true} if the cache is full, {@code false} otherwise.
+ */
+ private boolean isCacheFull() {
+ return size() > maximumCacheSize;
+ }
+
+ /**
+ * Indicates whether the size of the queue of changes exceeds the maximum
allowed size which is the max value
+ * between {@link #MINIMUM_QUEUE_SIZE} and {@code 2 * maximumCacheSize}.
+ *
+ * @return {@code true} if the queue is full, {@code false} otherwise.
+ */
+ private boolean isQueueFull() {
+ return getQueueSize() > Math.max(2 * maximumCacheSize,
MINIMUM_QUEUE_SIZE);
}
/**
@@ -248,6 +270,24 @@ public class SimpleLRUCache<K, V> extends
ConcurrentHashMap<K, V> {
return oldest;
}
+ /**
+ * Removes duplicates from the queue of changes.
+ */
+ private void compressChanges() {
+ Deque<Entry<K, V>> currentChanges = this.lastChanges;
+ Deque<Entry<K, V>> newChanges = new ConcurrentLinkedDeque<>();
+ this.lastChanges = newChanges;
+ Set<K> keys = new HashSet<>();
+ Entry<K, V> entry;
+ while ((entry = currentChanges.pollLast()) != null) {
+ if (keys.add(entry.getKey())) {
+ newChanges.addFirst(entry);
+ } else {
+ totalChanges.decrement();
+ }
+ }
+ }
+
/**
* The internal context of all write operations.
*/
@@ -274,12 +314,17 @@ public class SimpleLRUCache<K, V> extends
ConcurrentHashMap<K, V> {
public void close() {
if (cache.evictionNeeded() && cache.eviction.compareAndSet(false,
true)) {
try {
- while (cache.evictionNeeded()) {
- Entry<K, V> oldest = cache.nextOldestChange();
- if (oldest != null && cache.remove(oldest.getKey(),
oldest.getValue())) {
- cache.evict.accept(oldest.getValue());
+ do {
+ cache.compressChanges();
+ if (cache.isCacheFull()) {
+ Entry<K, V> oldest = cache.nextOldestChange();
+ if (oldest != null &&
cache.remove(oldest.getKey(), oldest.getValue())) {
+ cache.evict.accept(oldest.getValue());
+ }
+ } else {
+ break;
}
- }
+ } while (cache.evictionNeeded());
} finally {
cache.eviction.set(false);
}