vcrfxia commented on code in PR #13364:
URL: https://github.com/apache/kafka/pull/13364#discussion_r1152358196


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java:
##########
@@ -495,6 +501,41 @@ private boolean isLastIndex(final int index) {
             return unpackedReversedTimestampAndValueSizes.get(index).timestamp 
== minTimestamp;
         }
 
+        private void truncateRecordsToTimestamp(final long timestamp) {
+            if (timestamp <= minTimestamp) {
+                // delete everything in this current segment by replacing it 
with a degenerate segment
+                initializeWithRecord(new ValueAndValueSize(null), timestamp, 
timestamp);
+                return;
+            }
+
+            final SegmentSearchResult searchResult = find(timestamp, false);
+            // all records with later timestamps should be removed
+            int fullRecordsToTruncate = searchResult.index();
+            // additionally remove the current record as well, if its 
validFrom equals the
+            // timestamp to truncate to
+            if (searchResult.validFrom() == timestamp) {
+                fullRecordsToTruncate++;

Review Comment:
   Suppose the older segment has records `[validFrom=10, validTo=20)` and 
`[validFrom=20, validTo=30)`. Records are stored reverse-sorted by timestamp so 
`[validFrom=20, validTo=30)` has index 0 and `[validFrom=10, validTo=20)` has 
index 1. If we want to truncate to timestamp 20, calling `find()` will return 
index 0, but the number of (full) records to remove is 1 -- the entire record 
with index 0 must be removed.
   
   In contrast, if we are truncating to timestamp 25, calling `find()` will 
also return 0. In this case, the number of records to remove is 0. No 
incrementing is needed here. Does that help? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to