mjsax commented on code in PR #13243:
URL: https://github.com/apache/kafka/pull/13243#discussion_r1107704159


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##########
@@ -283,7 +293,12 @@ public void init(final StateStoreContext context, final 
StateStore root) {
 
     // VisibleForTesting
     void restoreBatch(final Collection<ConsumerRecord<byte[], byte[]>> 
records) {
-        // advance stream time to the max timestamp in the batch
+        // copy the observed stream time, for use in deciding whether to drop 
records during restore,
+        // when records have exceeded the store's grace period.
+        long streamTimeForRestore = observedStreamTime;

Review Comment:
   Did not have a concrete proposal. Should be fine I guess.
   
   Currently, `streamTime` is tracked per task (based on input records over all 
partitions). And yes, there is all kind of tricky things that you call out. 
Even if we have a filter() downstream processors see only a subset of data and 
their "internal stream-time (if they have any)" could be different (ie 
lagging). Caching has a similar effect.
   
   There is a proposal to let KS track streamTime per processor, too.
   
   Bottom line: it's complicated and need proper design and a KIP by itself...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to