This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 96b1db510a3 KAFKA-14415: Faster ThreadCache (#12903)
96b1db510a3 is described below

commit 96b1db510a32b99b57c27a9b3001c9e15af722d4
Author: Lucas Brutschy <[email protected]>
AuthorDate: Wed Dec 7 02:22:42 2022 +0100

    KAFKA-14415: Faster ThreadCache (#12903)
    
    Optimization of `ThreadCache`. The original implementation showed 
significant slow-down when many caches were registered.
    
    `sizeBytes` was called at least once, and potentially many times
    in every `put` and was linear in the number of caches (= number of
    state stores, so typically proportional to number of tasks). That
    means, with every additional task, every put gets a little slower.
    This was confirmed experimentally.
    
    In this change, we modify the implementation of `ThreadCache` to
    keep track of the total size in bytes. To be independent of the
    concrete implementation of the underlying cache, we update the size
    by subtracting the old and adding the new size of the cache before
    and after every modifying operation. For this we acquire the object
    monitor of the cache, but since all modifying operations on the caches
    are synchronized already, this should not cause extra overhead.
    
    This change also fixes a `ConcurrentModificationException` that could
    be thrown in a race between `sizeBytes` and `getOrCreate`.
    
    Reviewers: Anna Sophie Blee-Goldman <[email protected]>
---
 .../kafka/streams/state/internals/ThreadCache.java | 63 +++++++++++++++-------
 1 file changed, 43 insertions(+), 20 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
index cf3a39a8043..60a934e6b46 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
@@ -29,6 +29,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * An in-memory LRU cache store similar to {@link MemoryLRUCache} but 
byte-based, not
@@ -40,6 +41,9 @@ public class ThreadCache {
     private final StreamsMetricsImpl metrics;
     private final Map<String, NamedCache> caches = new HashMap<>();
 
+    // Invariant: equal to sum of sizeInBytes of caches
+    private final AtomicLong sizeInBytes = new AtomicLong();
+
     // internal stats
     private long numPuts = 0;
     private long numGets = 0;
@@ -81,9 +85,13 @@ public class ThreadCache {
                 return;
             }
             final CircularIterator<NamedCache> circularIterator = new 
CircularIterator<>(caches.values());
-            while (sizeBytes() > maxCacheSizeBytes) {
+            while (sizeInBytes.get() > maxCacheSizeBytes) {
                 final NamedCache cache = circularIterator.next();
-                cache.evict();
+                synchronized (cache) {
+                    final long oldSize = cache.sizeInBytes();
+                    cache.evict();
+                    sizeInBytes.getAndAdd(cache.sizeInBytes() - oldSize);
+                }
                 numEvicts++;
             }
         } else {
@@ -133,7 +141,12 @@ public class ThreadCache {
         if (cache == null) {
             return;
         }
-        cache.flush();
+
+        synchronized (cache) {
+            final long oldSize = cache.sizeInBytes();
+            cache.flush();
+            sizeInBytes.getAndAdd(cache.sizeInBytes() - oldSize);
+        }
 
         if (log.isTraceEnabled()) {
             log.trace("Cache stats on flush: #puts={}, #gets={}, #evicts={}, 
#flushes={}", puts(), gets(), evicts(), flushes());
@@ -158,15 +171,24 @@ public class ThreadCache {
         numPuts++;
 
         final NamedCache cache = getOrCreateCache(namespace);
-        cache.put(key, value);
-        maybeEvict(namespace);
+
+        synchronized (cache) {
+            final long oldSize = cache.sizeInBytes();
+            cache.put(key, value);
+            sizeInBytes.getAndAdd(cache.sizeInBytes() - oldSize);
+            maybeEvict(namespace, cache);
+        }
     }
 
     public LRUCacheEntry putIfAbsent(final String namespace, final Bytes key, 
final LRUCacheEntry value) {
         final NamedCache cache = getOrCreateCache(namespace);
-
-        final LRUCacheEntry result = cache.putIfAbsent(key, value);
-        maybeEvict(namespace);
+        final LRUCacheEntry result;
+        synchronized (cache) {
+            final long oldSize = cache.sizeInBytes();
+            result = cache.putIfAbsent(key, value);
+            sizeInBytes.getAndAdd(cache.sizeInBytes() - oldSize);
+            maybeEvict(namespace, cache);
+        }
 
         if (result == null) {
             numPuts++;
@@ -186,7 +208,13 @@ public class ThreadCache {
             return null;
         }
 
-        return cache.delete(key);
+        final LRUCacheEntry entry;
+        synchronized (cache) {
+            final long oldSize = cache.sizeInBytes();
+            entry = cache.delete(key);
+            sizeInBytes.getAndAdd(cache.sizeInBytes() - oldSize);
+        }
+        return entry;
     }
 
     public MemoryLRUCacheBytesIterator range(final String namespace, final 
Bytes from, final Bytes to) {
@@ -241,27 +269,20 @@ public class ThreadCache {
     }
 
     long sizeBytes() {
-        long sizeInBytes = 0;
-        for (final NamedCache namedCache : caches.values()) {
-            sizeInBytes += namedCache.sizeInBytes();
-            if (isOverflowing(sizeInBytes)) {
-                return Long.MAX_VALUE;
-            }
-        }
-        return sizeInBytes;
+        return sizeInBytes.get();
     }
 
     synchronized void close(final String namespace) {
         final NamedCache removed = caches.remove(namespace);
         if (removed != null) {
+            sizeInBytes.getAndAdd(-removed.sizeInBytes());
             removed.close();
         }
     }
 
-    private void maybeEvict(final String namespace) {
+    private void maybeEvict(final String namespace, final NamedCache cache) {
         int numEvicted = 0;
-        while (sizeBytes() > maxCacheSizeBytes) {
-            final NamedCache cache = getOrCreateCache(namespace);
+        while (sizeInBytes.get() > maxCacheSizeBytes) {
             // we abort here as the put on this cache may have triggered
             // a put on another cache. So even though the sizeInBytes() is
             // still > maxCacheSizeBytes there is nothing to evict from this
@@ -269,7 +290,9 @@ public class ThreadCache {
             if (cache.isEmpty()) {
                 return;
             }
+            final long oldSize = cache.sizeInBytes();
             cache.evict();
+            sizeInBytes.getAndAdd(cache.sizeInBytes() - oldSize);
             numEvicts++;
             numEvicted++;
         }

Reply via email to