mjsax commented on code in PR #13243:
URL: https://github.com/apache/kafka/pull/13243#discussion_r1106567105


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##########
@@ -283,7 +293,12 @@ public void init(final StateStoreContext context, final 
StateStore root) {
 
     // VisibleForTesting
     void restoreBatch(final Collection<ConsumerRecord<byte[], byte[]>> 
records) {
-        // advance stream time to the max timestamp in the batch
+        // copy the observed stream time, for use in deciding whether to drop 
records during restore,
+        // when records have exceeded the store's grace period.
+        long streamTimeForRestore = observedStreamTime;

Review Comment:
   I guess the question is, what is the value of `observedStreamTime` when we 
start the restore? Are you saying it's `-1` and we basically "reply" 
`observedStreamTime` during restore? I guess I got confused with "streamTime" 
that is tracked by KS runtime and preserved across restarts; but the store does 
not use it (IIRC), but rather tracks its own time, right?
   
   Maybe best to update some variable names? In the end, we do a "real reply" 
of stream-time for "grace period", and we apply an optimization for "history 
retention" by looking ahead (to the end of the batch) -> 
`endOfBatchStreamTime`. -- I guess follow up work (independent for this KIP) 
might be, to actually make use of KS runtime streamTime instead of tracking 
inside the store, and thus won't need `observedStreamTime` any longer, as we 
could look ahead to the "end-of-restore stream-time" (not just "end-of batch").



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to