vcrfxia commented on code in PR #13364:
URL: https://github.com/apache/kafka/pull/13364#discussion_r1151060036


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java:
##########
@@ -495,6 +501,41 @@ private boolean isLastIndex(final int index) {
             return unpackedReversedTimestampAndValueSizes.get(index).timestamp 
== minTimestamp;
         }
 
+        private void truncateRecordsToTimestamp(final long timestamp) {
+            if (timestamp <= minTimestamp) {
+                // delete everything in this current segment by replacing it 
with a degenerate segment
+                initializeWithRecord(new ValueAndValueSize(null), timestamp, 
timestamp);
+                return;
+            }
+
+            final SegmentSearchResult searchResult = find(timestamp, false);
+            // all records with later timestamps should be removed
+            int fullRecordsToTruncate = searchResult.index();
+            // additionally remove the current record as well, if its 
validFrom equals the
+            // timestamp to truncate to
+            if (searchResult.validFrom() == timestamp) {
+                fullRecordsToTruncate++;
+            }
+
+            if (fullRecordsToTruncate == 0) {
+                // no records to remove; update nextTimestamp and return
+                nextTimestamp = timestamp;
+                ByteBuffer.wrap(segmentValue, 0, TIMESTAMP_SIZE).putLong(0, 
timestamp);
+                return;
+            }
+
+            final int valuesLengthToRemove = 
cumulativeValueSizes.get(fullRecordsToTruncate - 1);
+            final int timestampAndValueSizesLengthToRemove = (TIMESTAMP_SIZE + 
VALUE_SIZE) * fullRecordsToTruncate;
+            final int newSegmentLength = segmentValue.length - 
valuesLengthToRemove - timestampAndValueSizesLengthToRemove;
+            segmentValue = ByteBuffer.allocate(newSegmentLength)
+                .putLong(timestamp) // update nextTimestamp as part of 
truncation

Review Comment:
   > but it also does not matter, because we actually have a put() at hand that 
will update next_timestamp right after we finished the cleanup again
   
   Yeah that's correct. My intention with updating `nextTimestamp` here even 
though it will be immediately updated again once we return from this method 
(back to `insertAsLatest()`) is so that the state of the segment is always 
"consistent," which makes the code easier to reason about IMO. Another way to 
think about it is that when an inconsistency is detected from 
`insertAsLatest()`, we respond by truncating all contents in this segment up to 
`timestamp` (the timestamp of the record which is being inserted). This 
truncation involves updating `nextTimestamp` for the segment to be `timestamp`.
   
   Do you have thoughts on what additional comments (and where) would help 
clarify this? I had hoped that the method name `truncateRecordsToTimestamp` and 
this comment stating that we are updating `nextTimestamp` to `timestamp` would 
be sufficient, but it appears not :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to