mjsax commented on code in PR #13243: URL: https://github.com/apache/kafka/pull/13243#discussion_r1107704159
########## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java: ########## @@ -283,7 +293,12 @@ public void init(final StateStoreContext context, final StateStore root) { // VisibleForTesting void restoreBatch(final Collection<ConsumerRecord<byte[], byte[]>> records) { - // advance stream time to the max timestamp in the batch + // copy the observed stream time, for use in deciding whether to drop records during restore, + // when records have exceeded the store's grace period. + long streamTimeForRestore = observedStreamTime; Review Comment: Did not have a concrete proposal. Should be fine I guess. Currently, `streamTime` is tracked per task (based on input records over all partitions). And yes, there is all kind of tricky things that you call out. Even if we have a filter() downstream processors see only a subset of data and their "internal stream-time (if they have any)" could be different (ie lagging). Caching has a similar effect. There is a proposal to let KS track streamTime per processor, too. Bottom line: it's complicated and need proper design and a KIP by itself... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org