[ https://issues.apache.org/jira/browse/KAFKA-15302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17752451#comment-17752451 ]
Jinyong Choi commented on KAFKA-15302: -------------------------------------- [~mjsax] , [~guozhang] If you perform a delete() within a while() loop, it seems that due to the interactions of maybeEvict()->flush(), the value of a key that hasn't been traversed yet might return as stale data. Therefore, I consider this to be a bug. The main changes are as follows: I've made modifications to give a hint to the cache, determining whether to call maybeEvict(), by checking the current state of the RocksDB KeyValueStore, particularly when it's in a snapshot state. Please refer to the code snippet below for a complete view of the changes.(I haven't modified the test code.) [https://github.com/apache/kafka/compare/trunk...jinyongchoi:kafka:KAFKA-15302-testing] {code:java} streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java ... @Override public boolean isEvictionInvocationViable() { return openIterators.isEmpty(); } streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java ... private void putInternal(final Bytes key, final byte[] value) { context.cache().put( cacheName, key, new LRUCacheEntry( value, context.headers(), true, context.offset(), context.timestamp(), context.partition(), context.topic()), wrapped().isEvictionInvocationViable()); StoreQueryUtils.updatePosition(position, context); } streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java ... public void put(final String namespace, final Bytes key, final LRUCacheEntry value, final boolean isEvictionViable) { numPuts++; final NamedCache cache = getOrCreateCache(namespace); synchronized (cache) { final long oldSize = cache.sizeInBytes(); cache.put(key, value); sizeInBytes.getAndAdd(cache.sizeInBytes() - oldSize); if (isEvictionViable) { maybeEvict(namespace, cache); } } } {code} After the modifications, the following thoughts arise: 1. It seems unnecessary to perform delete operations during traversal for SessionStore or TimestampedKeyValueStore, but this aspect needs documentation. 2. It functions as expected, but the code doesn't seem to be very clean. 3. Since flush() is suppressed during the while() loop, many keys are stored in the Cache. However, as their values are null, it appears there isn't a significant memory burden. Still, caution is warranted. 4. Due to the inhibition of flush() during the while() loop, a subsequent flush operation, as shown below, took 10 seconds. While cases requiring the processing of 5,000,000 items at once are unlikely, this aspect also demands attention. {code:java} 21:26:17.509 [...-StreamThread-1] INFO o.a.k.s.state.internals.NamedCache -- Named Cache flush start dirtyKeys.size():5000000 entries:5000000 21:26:26.874 [...-StreamThread-1] INFO o.a.k.s.state.internals.NamedCache -- Named Cache flush end dirtyKeys.size():0 entries:5000000{code} 5. If it takes time to correct it in the right direction, it might be a good idea to document this in advance to aid developers' understanding. I'm not coming up with any better ideas. If it takes time to make the correct modifications, I agree that we should update the documentation first. > Stale value returned when using store.all() in punctuation function. > -------------------------------------------------------------------- > > Key: KAFKA-15302 > URL: https://issues.apache.org/jira/browse/KAFKA-15302 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 3.5.1 > Reporter: Jinyong Choi > Priority: Major > > When using the store.all() function within the Punctuation function of > this.context.schedule, the previous value is returned. In other words, even > though the value has been stored from 1 to 2, it doesn't return 2; instead, > it returns 1. > In the provided test code, you can see the output 'BROKEN !!!', and while > this doesn't occur 100% of the time, by adding logs, it's evident that during > the while loop after all() is called, the cache is flushed. As a result, the > named cache holds a null value, causing the return of a value from RocksDB. > This is observed as the value after the .get() call is different from the > expected value. This is possibly due to the consistent read functionality of > RocksDB, although the exact cause is not certain. > Of course, if you perform {{store.flush()}} before {{all()}} there won't be > any errors. > > * test code (forked from balajirrao and modified for this) > [https://github.com/jinyongchoi/kafka-streams-multi-runner/|https://github.com/jinyongchoi/kafka-streams-multi-runner/tree/main] > > {code:java} > private void forwardAll(final long timestamp) { > // > System.err.println("forwardAll Start"); KeyValueIterator<String, > Integer> kvList = this.kvStore.all(); > while (kvList.hasNext()) { > KeyValue<String, Integer> entry = kvList.next(); > final Record<String, Integer> msg = new Record<>(entry.key, > entry.value, context.currentSystemTimeMs()); > final Integer storeValue = this.kvStore.get(entry.key); if > (entry.value != storeValue) { > System.err.println("[" + instanceId + "]" + "!!! BROKEN !!! Key: > " + entry.key + " Expected in stored(Cache or Store) value: " + storeValue + > " but KeyValueIterator value: " + entry.value); > throw new RuntimeException("Broken!"); > } this.context.forward(msg); > } > kvList.close(); > } > {code} > * log file (add log in stream source) > > {code:java} > # console log > sbt clean "worker/assembly"; sbt "worker/assembly"; sbt "coordinator / run 1" > [info] welcome to sbt 1.8.2 (Ubuntu Java 11.0.20) > ... > [info] running Coordinator 1 > appid: 95108c48-7c69-4eeb-adbd-9d091bd84933 > [0] starting instance +1 > forwardAll Start > [0]!!! BROKEN !!! Key: 636398 Expected in stored(Cache or Store) value: 2 but > KeyValueIterator value: 1 > # log file > ... > 01:05:00.382 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.state.internals.NamedCache -- Named cache 0_0-Counts stats on > flush: #hits=5628524, #misses=5636397, #overwrites=636397, #flushes=401 > 01:05:00.388 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.state.internals.NamedCache -- Named Cache flush > dirtyKeys.size():7873 entries:7873 > 01:05:00.434 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.p.i.ProcessorStateManager -- stream-thread > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > stream-task [0_0] Flushed cache or buffer Counts > ... > 01:05:00.587 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.s.i.CachingKeyValueStore -- KeyValueIterator<Bytes, byte[]> > all() > 01:05:00.588 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.state.internals.RocksDBStore -- RocksDB KeyValueIterator all > 01:05:00.590 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.state.internals.ThreadCache -- stream-thread > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > MemoryLRUCacheBytesIterator cache all() > 01:05:00.591 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.state.internals.NamedCache -- NamedCache allKeys() > size():325771 > 01:05:00.637 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.state.internals.NamedCache -- NamedCache keySetIterator() > TreeSet size():325771 > ... > 01:05:07.052 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.state.internals.NamedCache -- 0_0-Counts evict() isDirty() > eldest.size():103 > 01:05:07.052 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.state.internals.NamedCache -- Named cache 0_0-Counts stats on > flush: #hits=5636398, #misses=6233857, #overwrites=639857, #flushes=402 > 01:05:07.053 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.state.internals.NamedCache -- Named Cache flush > dirtyKeys.size():3459 entries:3460 <= NamedCache.flush() > ... > ThreadCache set nextEntry is null key:636398 <= > MemoryLRUCacheBytesIterator.internalNext() > ... > 01:06:31.382 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > WARN o.a.k.s.s.i.AbstractMergedSortedCacheStoreIterator -- > -AbstractMergedSortedCacheStoreIterator- -> store nextCacheKey: [null], > nextStoreKey: [636398] nextStoreValue: [1] > [0]!!! BROKEN !!! Key: 636398 Expected in stored(Cache or Store) value: 2 but > KeyValueIterator value: 1 {code} > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)