[ https://issues.apache.org/jira/browse/KAFKA-14624?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17676934#comment-17676934 ]
Matthias J. Sax commented on KAFKA-14624: ----------------------------------------- Thanks for reporting this issue – will need to think about the details more (and look into you project that reproduces it) – my first reaction was, "why is the cache not empty"? Registering the call-back on the cache might introduce a lot of overhead during restore, and thus, I have doubts if it's a good fix. {quote}If a partition moves from instance 1 to 2 and then back to instance 1 {quote} If the task/partition moves from 1 to 2, the task on one should be closed and the cache should be gone? Or are you saying the task on instance 1 becomes a standby? > State restoration is broken with standby tasks and cache-enabled stores in > processor API > ---------------------------------------------------------------------------------------- > > Key: KAFKA-14624 > URL: https://issues.apache.org/jira/browse/KAFKA-14624 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 3.3.1 > Reporter: Balaji Rao > Priority: Major > > I found that cache-enabled state stores in PAPI with standby tasks sometimes > returns stale data when a partition moves from one app instance to another > and back. [Here's|https://github.com/balajirrao/kafka-streams-multi-runner] a > small project that I used to reproduce the issue. > I dug around a bit and it seems like it's a bug in standby task state > restoration when caching is enabled. If a partition moves from instance 1 to > 2 and then back to instance 1, since the `CachingKeyValueStore` doesn't > register a restore callback, it can return potentially stale data for > non-dirty keys. > I could fix the issue by modifying the `CachingKeyValueStore` to register a > restore callback in which the cache restored keys are added to the cache. Is > this fix in the right direction? > {code:java} > // register the store > context.register( > root, > (RecordBatchingStateRestoreCallback) records -> { > for (final ConsumerRecord<byte[], byte[]> record : > records) { > put(Bytes.wrap(record.key()), record.value()); > } > } > ); > {code} > > I would like to contribute a fix, if I can get some help! -- This message was sent by Atlassian Jira (v8.20.10#820010)