This is an automated email from the ASF dual-hosted git repository.
ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 96b1db510a3 KAFKA-14415: Faster ThreadCache (#12903)
96b1db510a3 is described below
commit 96b1db510a32b99b57c27a9b3001c9e15af722d4
Author: Lucas Brutschy <[email protected]>
AuthorDate: Wed Dec 7 02:22:42 2022 +0100
KAFKA-14415: Faster ThreadCache (#12903)
Optimization of `ThreadCache`. The original implementation showed
significant slow-down when many caches were registered.
`sizeBytes` was called at least once, and potentially many times
in every `put` and was linear in the number of caches (= number of
state stores, so typically proportional to number of tasks). That
means, with every additional task, every put gets a little slower.
This was confirmed experimentally.
In this change, we modify the implementation of `ThreadCache` to
keep track of the total size in bytes. To be independent of the
concrete implementation of the underlying cache, we update the size
by subtracting the old and adding the new size of the cache before
and after every modifying operation. For this we acquire the object
monitor of the cache, but since all modifying operations on the caches
are synchronized already, this should not cause extra overhead.
This change also fixes a `ConcurrentModificationException` that could
be thrown in a race between `sizeBytes` and `getOrCreate`.
Reviewers: Anna Sophie Blee-Goldman <[email protected]>
---
.../kafka/streams/state/internals/ThreadCache.java | 63 +++++++++++++++-------
1 file changed, 43 insertions(+), 20 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
index cf3a39a8043..60a934e6b46 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
@@ -29,6 +29,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
+import java.util.concurrent.atomic.AtomicLong;
/**
* An in-memory LRU cache store similar to {@link MemoryLRUCache} but
byte-based, not
@@ -40,6 +41,9 @@ public class ThreadCache {
private final StreamsMetricsImpl metrics;
private final Map<String, NamedCache> caches = new HashMap<>();
+ // Invariant: equal to sum of sizeInBytes of caches
+ private final AtomicLong sizeInBytes = new AtomicLong();
+
// internal stats
private long numPuts = 0;
private long numGets = 0;
@@ -81,9 +85,13 @@ public class ThreadCache {
return;
}
final CircularIterator<NamedCache> circularIterator = new
CircularIterator<>(caches.values());
- while (sizeBytes() > maxCacheSizeBytes) {
+ while (sizeInBytes.get() > maxCacheSizeBytes) {
final NamedCache cache = circularIterator.next();
- cache.evict();
+ synchronized (cache) {
+ final long oldSize = cache.sizeInBytes();
+ cache.evict();
+ sizeInBytes.getAndAdd(cache.sizeInBytes() - oldSize);
+ }
numEvicts++;
}
} else {
@@ -133,7 +141,12 @@ public class ThreadCache {
if (cache == null) {
return;
}
- cache.flush();
+
+ synchronized (cache) {
+ final long oldSize = cache.sizeInBytes();
+ cache.flush();
+ sizeInBytes.getAndAdd(cache.sizeInBytes() - oldSize);
+ }
if (log.isTraceEnabled()) {
log.trace("Cache stats on flush: #puts={}, #gets={}, #evicts={},
#flushes={}", puts(), gets(), evicts(), flushes());
@@ -158,15 +171,24 @@ public class ThreadCache {
numPuts++;
final NamedCache cache = getOrCreateCache(namespace);
- cache.put(key, value);
- maybeEvict(namespace);
+
+ synchronized (cache) {
+ final long oldSize = cache.sizeInBytes();
+ cache.put(key, value);
+ sizeInBytes.getAndAdd(cache.sizeInBytes() - oldSize);
+ maybeEvict(namespace, cache);
+ }
}
public LRUCacheEntry putIfAbsent(final String namespace, final Bytes key,
final LRUCacheEntry value) {
final NamedCache cache = getOrCreateCache(namespace);
-
- final LRUCacheEntry result = cache.putIfAbsent(key, value);
- maybeEvict(namespace);
+ final LRUCacheEntry result;
+ synchronized (cache) {
+ final long oldSize = cache.sizeInBytes();
+ result = cache.putIfAbsent(key, value);
+ sizeInBytes.getAndAdd(cache.sizeInBytes() - oldSize);
+ maybeEvict(namespace, cache);
+ }
if (result == null) {
numPuts++;
@@ -186,7 +208,13 @@ public class ThreadCache {
return null;
}
- return cache.delete(key);
+ final LRUCacheEntry entry;
+ synchronized (cache) {
+ final long oldSize = cache.sizeInBytes();
+ entry = cache.delete(key);
+ sizeInBytes.getAndAdd(cache.sizeInBytes() - oldSize);
+ }
+ return entry;
}
public MemoryLRUCacheBytesIterator range(final String namespace, final
Bytes from, final Bytes to) {
@@ -241,27 +269,20 @@ public class ThreadCache {
}
long sizeBytes() {
- long sizeInBytes = 0;
- for (final NamedCache namedCache : caches.values()) {
- sizeInBytes += namedCache.sizeInBytes();
- if (isOverflowing(sizeInBytes)) {
- return Long.MAX_VALUE;
- }
- }
- return sizeInBytes;
+ return sizeInBytes.get();
}
synchronized void close(final String namespace) {
final NamedCache removed = caches.remove(namespace);
if (removed != null) {
+ sizeInBytes.getAndAdd(-removed.sizeInBytes());
removed.close();
}
}
- private void maybeEvict(final String namespace) {
+ private void maybeEvict(final String namespace, final NamedCache cache) {
int numEvicted = 0;
- while (sizeBytes() > maxCacheSizeBytes) {
- final NamedCache cache = getOrCreateCache(namespace);
+ while (sizeInBytes.get() > maxCacheSizeBytes) {
// we abort here as the put on this cache may have triggered
// a put on another cache. So even though the sizeInBytes() is
// still > maxCacheSizeBytes there is nothing to evict from this
@@ -269,7 +290,9 @@ public class ThreadCache {
if (cache.isEmpty()) {
return;
}
+ final long oldSize = cache.sizeInBytes();
cache.evict();
+ sizeInBytes.getAndAdd(cache.sizeInBytes() - oldSize);
numEvicts++;
numEvicted++;
}