vcrfxia commented on code in PR #13364:
URL: https://github.com/apache/kafka/pull/13364#discussion_r1153892915


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java:
##########
@@ -495,6 +501,41 @@ private boolean isLastIndex(final int index) {
             return unpackedReversedTimestampAndValueSizes.get(index).timestamp 
== minTimestamp;
         }
 
+        private void truncateRecordsToTimestamp(final long timestamp) {
+            if (timestamp <= minTimestamp) {
+                // delete everything in this current segment by replacing it 
with a degenerate segment
+                initializeWithRecord(new ValueAndValueSize(null), timestamp, 
timestamp);
+                return;
+            }
+
+            final SegmentSearchResult searchResult = find(timestamp, false);
+            // all records with later timestamps should be removed
+            int fullRecordsToTruncate = searchResult.index();
+            // additionally remove the current record as well, if its 
validFrom equals the
+            // timestamp to truncate to
+            if (searchResult.validFrom() == timestamp) {
+                fullRecordsToTruncate++;
+            }
+
+            if (fullRecordsToTruncate == 0) {
+                // no records to remove; update nextTimestamp and return
+                nextTimestamp = timestamp;
+                ByteBuffer.wrap(segmentValue, 0, TIMESTAMP_SIZE).putLong(0, 
timestamp);
+                return;
+            }
+
+            final int valuesLengthToRemove = 
cumulativeValueSizes.get(fullRecordsToTruncate - 1);
+            final int timestampAndValueSizesLengthToRemove = (TIMESTAMP_SIZE + 
VALUE_SIZE) * fullRecordsToTruncate;
+            final int newSegmentLength = segmentValue.length - 
valuesLengthToRemove - timestampAndValueSizesLengthToRemove;
+            segmentValue = ByteBuffer.allocate(newSegmentLength)
+                .putLong(timestamp) // update nextTimestamp as part of 
truncation

Review Comment:
   I opted to add a javadoc to the method -- LMK whether you think it clears up 
the confusion here. 
   
   I think it might be a bit too granular to add the explanation into this line 
itself because there's also another place in the method (the first if-condition 
were we replace the entire segment with a degenerate one if the truncation 
timestamp is less than or equal to the minTimestamp of the segment) where 
readers are likely to have the same question if it's not already clear that 
nextTimestamp will be updated to the truncation timestamp.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to