This is an automated email from the ASF dual-hosted git repository.

nfilotto pushed a commit to branch 
essobedo/CAMEL-22176/blocking-compression-changes
in repository https://gitbox.apache.org/repos/asf/camel.git

commit d3f20358f36b8ebb24a41907c51ecf4584a2176f
Author: Nicolas Filotto <[email protected]>
AuthorDate: Wed Jun 18 13:19:50 2025 +0200

    CAMEL-22176: camel-core - keep compression changes blocking
---
 .../apache/camel/support/cache/SimpleLRUCache.java | 41 ++++++++++------------
 1 file changed, 19 insertions(+), 22 deletions(-)

diff --git 
a/core/camel-support/src/main/java/org/apache/camel/support/cache/SimpleLRUCache.java
 
b/core/camel-support/src/main/java/org/apache/camel/support/cache/SimpleLRUCache.java
index 5c3e79be57f..71793e3e6bf 100644
--- 
a/core/camel-support/src/main/java/org/apache/camel/support/cache/SimpleLRUCache.java
+++ 
b/core/camel-support/src/main/java/org/apache/camel/support/cache/SimpleLRUCache.java
@@ -36,7 +36,7 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 
 /**
- * {@code SimpleLRUCache} is a simple implementation of a cache of type Least 
Recently Used . The implementation doesn't
+ * {@code SimpleLRUCache} is a simple implementation of a Least Recently Used 
cache. The implementation doesn't
  * accept null values. Generally speaking, the parameters of all the public 
methods must have a value otherwise a
  * {@code NullPointerException} is thrown.
  *
@@ -47,7 +47,7 @@ public class SimpleLRUCache<K, V> implements Map<K, V> {
 
     static final float DEFAULT_LOAD_FACTOR = 0.75f;
     /**
-     * The minimum size of the queue of changes.
+     * The minimum size of the changes.
      */
     static final int MINIMUM_QUEUE_SIZE = 128;
     /**
@@ -55,7 +55,7 @@ public class SimpleLRUCache<K, V> implements Map<K, V> {
      */
     private final AtomicBoolean eviction = new AtomicBoolean();
     /**
-     * The lock to prevent the addition of changes during the swap of queue of 
changes or the cache cleaning.
+     * The lock to prevent the addition of changes during the swap of changes 
or the cache cleaning.
      */
     private final ReadWriteLock lock = new ReentrantReadWriteLock();
     /**
@@ -68,7 +68,7 @@ public class SimpleLRUCache<K, V> implements Map<K, V> {
     private final AtomicReference<Deque<Entry<K, ValueHolder<V>>>> lastChanges
             = new AtomicReference<>(new ConcurrentLinkedDeque<>());
     /**
-     * The total amount of changes recorded.
+     * The total number of changes recorded.
      */
     private final AtomicInteger totalChanges = new AtomicInteger();
     /**
@@ -339,7 +339,7 @@ public class SimpleLRUCache<K, V> implements Map<K, V> {
     }
 
     /**
-     * @return the size of the queue of changes.
+     * @return the size of the changes.
      */
     int getQueueSize() {
         return totalChanges.get();
@@ -364,7 +364,7 @@ public class SimpleLRUCache<K, V> implements Map<K, V> {
     }
 
     /**
-     * Indicates whether the size of the queue of changes exceeds the maximum 
allowed size which is the max value
+     * Indicates whether the size of the changes exceeds the maximum allowed 
size which is the max value
      * between {@link #MINIMUM_QUEUE_SIZE} and {@code 2 * maximumCacheSize}.
      *
      * @return {@code true} if the queue is full, {@code false} otherwise.
@@ -386,28 +386,25 @@ public class SimpleLRUCache<K, V> implements Map<K, V> {
      * Removes duplicates from the queue of changes if the queue is full.
      */
     private void compressChangesIfNeeded() {
-        Deque<Entry<K, ValueHolder<V>>> newChanges;
-        Deque<Entry<K, ValueHolder<V>>> currentChanges;
         lock.writeLock().lock();
         try {
             if (isQueueFull()) {
-                newChanges = new ConcurrentLinkedDeque<>();
-                totalChanges.set(0);
-                currentChanges = lastChanges.getAndSet(newChanges);
-            } else {
-                return;
+                Deque<Entry<K, ValueHolder<V>>> newChanges = new 
ConcurrentLinkedDeque<>();
+                Deque<Entry<K, ValueHolder<V>>> currentChanges = 
lastChanges.getAndSet(newChanges);
+                Set<K> keys = new HashSet<>();
+                Entry<K, ValueHolder<V>> entry;
+                int changes = 0;
+                while ((entry = currentChanges.pollLast()) != null) {
+                    if (keys.add(entry.getKey())) {
+                        newChanges.addFirst(entry);
+                        changes++;
+                    }
+                }
+                totalChanges.set(changes);
             }
         } finally {
             lock.writeLock().unlock();
         }
-        Set<K> keys = new HashSet<>();
-        Entry<K, ValueHolder<V>> entry;
-        while ((entry = currentChanges.pollLast()) != null) {
-            if (keys.add(entry.getKey())) {
-                newChanges.addFirst(entry);
-                totalChanges.incrementAndGet();
-            }
-        }
     }
 
     private ValueHolder<V> newValue(V value) {
@@ -419,7 +416,7 @@ public class SimpleLRUCache<K, V> implements Map<K, V> {
     }
 
     /**
-     * The internal context of all write operations.
+     * The internal context of all writes operations.
      */
     private static class OperationContext<K, V> implements AutoCloseable {
         /**

Reply via email to