vcrfxia commented on code in PR #13364:
URL: https://github.com/apache/kafka/pull/13364#discussion_r1153696299


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java:
##########
@@ -495,6 +501,41 @@ private boolean isLastIndex(final int index) {
             return unpackedReversedTimestampAndValueSizes.get(index).timestamp 
== minTimestamp;
         }
 
+        private void truncateRecordsToTimestamp(final long timestamp) {
+            if (timestamp <= minTimestamp) {
+                // delete everything in this current segment by replacing it 
with a degenerate segment
+                initializeWithRecord(new ValueAndValueSize(null), timestamp, 
timestamp);
+                return;
+            }
+
+            final SegmentSearchResult searchResult = find(timestamp, false);
+            // all records with later timestamps should be removed
+            int fullRecordsToTruncate = searchResult.index();
+            // additionally remove the current record as well, if its 
validFrom equals the
+            // timestamp to truncate to
+            if (searchResult.validFrom() == timestamp) {
+                fullRecordsToTruncate++;
+            }
+
+            if (fullRecordsToTruncate == 0) {
+                // no records to remove; update nextTimestamp and return

Review Comment:
   That can never happen -- when a single `put()` call requires writing to 
multiple segments, it always writes to the latest record in the older segment 
and the earliest record in the newer segment. If `[5,10)` is a partial write, 
then that means it was written as the latest record to an older segment, and 
that the newer segment already has that same record timestamp 5 in it as its 
earliest record (which was supposed to be replaced with a new record except 
that second write failed). In that case, any subsequent write with timestamp 
greater than 5 will stop in the newer segment because the newer segment still 
has minTimestamp = 5.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to