[ https://issues.apache.org/jira/browse/KAFKA-15302?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jinyong Choi updated KAFKA-15302: --------------------------------- Description: When using the store.all() function within the Punctuation function of this.context.schedule, the previous value is returned. In other words, even though the value has been stored from 1 to 2, it doesn't return 2; instead, it returns 1. In the provided test code, you can see the output 'BROKEN !!!', and while this doesn't occur 100% of the time, by adding logs, it's evident that during the while loop after all() is called, the cache is flushed. As a result, the named cache holds a null value, causing the return of a value from RocksDB. This is observed as the value after the .get() call is different from the expected value. This is possibly due to the consistent read functionality of RocksDB, although the exact cause is not certain. Of course, if you perform {{store.flush()}} before {{all()}} there won't be any errors. * test code (forked from balajirrao and modified for this) [https://github.com/jinyongchoi/kafka-streams-multi-runner/|https://github.com/jinyongchoi/kafka-streams-multi-runner/tree/main] {code:java} private void forwardAll(final long timestamp) { // System.err.println("forwardAll Start"); KeyValueIterator<String, Integer> kvList = this.kvStore.all(); while (kvList.hasNext()) { KeyValue<String, Integer> entry = kvList.next(); final Record<String, Integer> msg = new Record<>(entry.key, entry.value, context.currentSystemTimeMs()); final Integer storeValue = this.kvStore.get(entry.key); if (entry.value != storeValue) { System.err.println("[" + instanceId + "]" + "!!! BROKEN !!! Key: " + entry.key + " Expected in stored(Cache or Store) value: " + storeValue + " but KeyValueIterator value: " + entry.value); throw new RuntimeException("Broken!"); } this.context.forward(msg); } kvList.close(); } {code} * log file (add log in stream source) {code:java} # console log sbt clean "worker/assembly"; sbt "worker/assembly"; sbt "coordinator / run 1" [info] welcome to sbt 1.8.2 (Ubuntu Java 11.0.20) ... [info] running Coordinator 1 appid: 95108c48-7c69-4eeb-adbd-9d091bd84933 [0] starting instance +1 forwardAll Start [0]!!! BROKEN !!! Key: 636398 Expected in stored(Cache or Store) value: 2 but KeyValueIterator value: 1 # log file ... 01:05:00.382 [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] INFO o.a.k.s.state.internals.NamedCache -- Named cache 0_0-Counts stats on flush: #hits=5628524, #misses=5636397, #overwrites=636397, #flushes=401 01:05:00.388 [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] INFO o.a.k.s.state.internals.NamedCache -- Named Cache flush dirtyKeys.size():7873 entries:7873 01:05:00.434 [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] INFO o.a.k.s.p.i.ProcessorStateManager -- stream-thread [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] stream-task [0_0] Flushed cache or buffer Counts ... 01:05:00.587 [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] INFO o.a.k.s.s.i.CachingKeyValueStore -- KeyValueIterator<Bytes, byte[]> all() 01:05:00.588 [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] INFO o.a.k.s.state.internals.RocksDBStore -- RocksDB KeyValueIterator all 01:05:00.590 [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] INFO o.a.k.s.state.internals.ThreadCache -- stream-thread [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] MemoryLRUCacheBytesIterator cache all() 01:05:00.591 [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] INFO o.a.k.s.state.internals.NamedCache -- NamedCache allKeys() size():325771 01:05:00.637 [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] INFO o.a.k.s.state.internals.NamedCache -- NamedCache keySetIterator() TreeSet size():325771 ... 01:05:07.052 [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] INFO o.a.k.s.state.internals.NamedCache -- 0_0-Counts evict() isDirty() eldest.size():103 01:05:07.052 [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] INFO o.a.k.s.state.internals.NamedCache -- Named cache 0_0-Counts stats on flush: #hits=5636398, #misses=6233857, #overwrites=639857, #flushes=402 01:05:07.053 [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] INFO o.a.k.s.state.internals.NamedCache -- Named Cache flush dirtyKeys.size():3459 entries:3460 <= NamedCache.flush() ... ThreadCache set nextEntry is null key:636398 <= MemoryLRUCacheBytesIterator.internalNext() ... 01:06:31.382 [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] WARN o.a.k.s.s.i.AbstractMergedSortedCacheStoreIterator -- -AbstractMergedSortedCacheStoreIterator- -> store nextCacheKey: [null], nextStoreKey: [636398] nextStoreValue: [1] [0]!!! BROKEN !!! Key: 636398 Expected in stored(Cache or Store) value: 2 but KeyValueIterator value: 1 {code} was: When using the store.all() function within the Punctuation function of this.context.schedule, the previous value is returned. In other words, even though the value has been stored from 1 to 2, it doesn't return 2; instead, it returns 1. In the provided test code, you can see the output 'BROKEN !!!', and while this doesn't occur 100% of the time, by adding logs, it's evident that during the while loop after all() is called, the cache is flushed. As a result, the named cache holds a null value, causing the return of a value from RocksDB. This is observed as the value after the .get() call is different from the expected value. This is possibly due to the consistent read functionality of RocksDB, although the exact cause is not certain. Of course, if you perform {{store.flush()}} before {{all()}} there won't be any errors. * test code (forked from balajirrao and modified for this) [https://github.com/jinyongchoi/kafka-streams-multi-runner/|https://github.com/jinyongchoi/kafka-streams-multi-runner/tree/main] {code:java} private void forwardAll(final long timestamp) { // System.err.println("forwardAll Start"); KeyValueIterator<String, Integer> kvList = this.kvStore.all(); while (kvList.hasNext()) { KeyValue<String, Integer> entry = kvList.next(); final Record<String, Integer> msg = new Record<>(entry.key, entry.value, context.currentSystemTimeMs()); final Integer storeValue = this.kvStore.get(entry.key); if (entry.value != storeValue) { System.err.println("[" + instanceId + "]" + "!!! BROKEN !!! Key: " + entry.key + " Expected in stored(Cache or Store) value: " + storeValue + " but KeyValueIterator value: " + entry.value); throw new RuntimeException("Broken!"); } this.context.forward(msg); } kvList.close(); } {code} * log file (add log in stream source) {code:java} # console log sbt clean "worker/assembly"; sbt "worker/assembly"; sbt "coordinator / run 1" [info] welcome to sbt 1.8.2 (Ubuntu Java 11.0.20) ... [info] running Coordinator 1 appid: 95108c48-7c69-4eeb-adbd-9d091bd84933 [0] starting instance +1 forwardAll Start [0]!!! BROKEN !!! Key: 636398 Expected in stored(Cache or Store) value: 2 but KeyValueIterator value: 1 # log file ... 01:05:00.382 [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] INFO o.a.k.s.state.internals.NamedCache -- Named cache 0_0-Counts stats on flush: #hits=5628524, #misses=5636397, #overwrites=636397, #flushes=401 01:05:00.388 [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] INFO o.a.k.s.state.internals.NamedCache -- Named Cache flush dirtyKeys.size():7873 entries:7873 01:05:00.434 [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] INFO o.a.k.s.p.i.ProcessorStateManager -- stream-thread [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] stream-task [0_0] Flushed cache or buffer Counts ... 01:05:00.587 [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] INFO o.a.k.s.s.i.CachingKeyValueStore -- KeyValueIterator<Bytes, byte[]> all() 01:05:00.588 [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] INFO o.a.k.s.state.internals.RocksDBStore -- RocksDB KeyValueIterator all 01:05:00.590 [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] INFO o.a.k.s.state.internals.ThreadCache -- stream-thread [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] MemoryLRUCacheBytesIterator cache all() 01:05:00.591 [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] INFO o.a.k.s.state.internals.NamedCache -- NamedCache allKeys() size():325771 01:05:00.637 [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] INFO o.a.k.s.state.internals.NamedCache -- NamedCache keySetIterator() TreeSet size():325771 ... 01:05:07.052 [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] INFO o.a.k.s.state.internals.NamedCache -- ===========================0_0-Counts evict() isDirty() eldest.size():103 01:05:07.052 [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] INFO o.a.k.s.state.internals.NamedCache -- Named cache 0_0-Counts stats on flush: #hits=5636398, #misses=6233857, #overwrites=639857, #flushes=402 01:05:07.053 [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] INFO o.a.k.s.state.internals.NamedCache -- Named Cache flush dirtyKeys.size():3459 entries:3460 <= NamedCache.flush() ... ThreadCache set nextEntry is null key:636398 <= MemoryLRUCacheBytesIterator.internalNext() ... 01:06:31.382 [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] WARN o.a.k.s.s.i.AbstractMergedSortedCacheStoreIterator -- -AbstractMergedSortedCacheStoreIterator- -> store nextCacheKey: [null], nextStoreKey: [636398] nextStoreValue: [1] [0]!!! BROKEN !!! Key: 636398 Expected in stored(Cache or Store) value: 2 but KeyValueIterator value: 1 {code} > Stale value returned when using store.all() in punctuation function. > -------------------------------------------------------------------- > > Key: KAFKA-15302 > URL: https://issues.apache.org/jira/browse/KAFKA-15302 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 3.5.1 > Reporter: Jinyong Choi > Priority: Major > > When using the store.all() function within the Punctuation function of > this.context.schedule, the previous value is returned. In other words, even > though the value has been stored from 1 to 2, it doesn't return 2; instead, > it returns 1. > In the provided test code, you can see the output 'BROKEN !!!', and while > this doesn't occur 100% of the time, by adding logs, it's evident that during > the while loop after all() is called, the cache is flushed. As a result, the > named cache holds a null value, causing the return of a value from RocksDB. > This is observed as the value after the .get() call is different from the > expected value. This is possibly due to the consistent read functionality of > RocksDB, although the exact cause is not certain. > Of course, if you perform {{store.flush()}} before {{all()}} there won't be > any errors. > > * test code (forked from balajirrao and modified for this) > [https://github.com/jinyongchoi/kafka-streams-multi-runner/|https://github.com/jinyongchoi/kafka-streams-multi-runner/tree/main] > > {code:java} > private void forwardAll(final long timestamp) { > // > System.err.println("forwardAll Start"); KeyValueIterator<String, > Integer> kvList = this.kvStore.all(); > while (kvList.hasNext()) { > KeyValue<String, Integer> entry = kvList.next(); > final Record<String, Integer> msg = new Record<>(entry.key, > entry.value, context.currentSystemTimeMs()); > final Integer storeValue = this.kvStore.get(entry.key); if > (entry.value != storeValue) { > System.err.println("[" + instanceId + "]" + "!!! BROKEN !!! Key: > " + entry.key + " Expected in stored(Cache or Store) value: " + storeValue + > " but KeyValueIterator value: " + entry.value); > throw new RuntimeException("Broken!"); > } this.context.forward(msg); > } > kvList.close(); > } > {code} > * log file (add log in stream source) > > {code:java} > # console log > sbt clean "worker/assembly"; sbt "worker/assembly"; sbt "coordinator / run 1" > [info] welcome to sbt 1.8.2 (Ubuntu Java 11.0.20) > ... > [info] running Coordinator 1 > appid: 95108c48-7c69-4eeb-adbd-9d091bd84933 > [0] starting instance +1 > forwardAll Start > [0]!!! BROKEN !!! Key: 636398 Expected in stored(Cache or Store) value: 2 but > KeyValueIterator value: 1 > # log file > ... > 01:05:00.382 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.state.internals.NamedCache -- Named cache 0_0-Counts stats on > flush: #hits=5628524, #misses=5636397, #overwrites=636397, #flushes=401 > 01:05:00.388 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.state.internals.NamedCache -- Named Cache flush > dirtyKeys.size():7873 entries:7873 > 01:05:00.434 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.p.i.ProcessorStateManager -- stream-thread > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > stream-task [0_0] Flushed cache or buffer Counts > ... > 01:05:00.587 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.s.i.CachingKeyValueStore -- KeyValueIterator<Bytes, byte[]> > all() > 01:05:00.588 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.state.internals.RocksDBStore -- RocksDB KeyValueIterator all > 01:05:00.590 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.state.internals.ThreadCache -- stream-thread > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > MemoryLRUCacheBytesIterator cache all() > 01:05:00.591 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.state.internals.NamedCache -- NamedCache allKeys() > size():325771 > 01:05:00.637 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.state.internals.NamedCache -- NamedCache keySetIterator() > TreeSet size():325771 > ... > 01:05:07.052 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.state.internals.NamedCache -- 0_0-Counts evict() isDirty() > eldest.size():103 > 01:05:07.052 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.state.internals.NamedCache -- Named cache 0_0-Counts stats on > flush: #hits=5636398, #misses=6233857, #overwrites=639857, #flushes=402 > 01:05:07.053 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.state.internals.NamedCache -- Named Cache flush > dirtyKeys.size():3459 entries:3460 <= NamedCache.flush() > ... > ThreadCache set nextEntry is null key:636398 <= > MemoryLRUCacheBytesIterator.internalNext() > ... > 01:06:31.382 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > WARN o.a.k.s.s.i.AbstractMergedSortedCacheStoreIterator -- > -AbstractMergedSortedCacheStoreIterator- -> store nextCacheKey: [null], > nextStoreKey: [636398] nextStoreValue: [1] > [0]!!! BROKEN !!! Key: 636398 Expected in stored(Cache or Store) value: 2 but > KeyValueIterator value: 1 {code} > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)