mjsax commented on code in PR #13243: URL: https://github.com/apache/kafka/pull/13243#discussion_r1106567105
########## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java: ########## @@ -283,7 +293,12 @@ public void init(final StateStoreContext context, final StateStore root) { // VisibleForTesting void restoreBatch(final Collection<ConsumerRecord<byte[], byte[]>> records) { - // advance stream time to the max timestamp in the batch + // copy the observed stream time, for use in deciding whether to drop records during restore, + // when records have exceeded the store's grace period. + long streamTimeForRestore = observedStreamTime; Review Comment: I guess the question is, what is the value of `observedStreamTime` when we start the restore? Are you saying it's `-1` and we basically "reply" `observedStreamTime` during restore? I guess I got confused with "streamTime" that is tracked by KS runtime and preserved across restarts; but the store does not use it (IIRC), but rather tracks its own time, right? Maybe best to update some variable names? In the end, we do a "real reply" of stream-time for "grace period", and we apply an optimization for "history retention" by looking ahead (to the end of the batch) -> `endOfBatchStreamTime`. -- I guess follow up work (independent for this KIP) might be, to actually make use of KS runtime streamTime instead of tracking inside the store, and thus won't need `observedStreamTime` any longer, as we could look ahead to the "end-of-restore stream-time" (not just "end-of batch"). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org