vcrfxia commented on code in PR #13364:
URL: https://github.com/apache/kafka/pull/13364#discussion_r1151053147


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java:
##########
@@ -495,6 +501,41 @@ private boolean isLastIndex(final int index) {
             return unpackedReversedTimestampAndValueSizes.get(index).timestamp 
== minTimestamp;
         }
 
+        private void truncateRecordsToTimestamp(final long timestamp) {
+            if (timestamp <= minTimestamp) {
+                // delete everything in this current segment by replacing it 
with a degenerate segment
+                initializeWithRecord(new ValueAndValueSize(null), timestamp, 
timestamp);
+                return;
+            }
+
+            final SegmentSearchResult searchResult = find(timestamp, false);
+            // all records with later timestamps should be removed
+            int fullRecordsToTruncate = searchResult.index();
+            // additionally remove the current record as well, if its 
validFrom equals the
+            // timestamp to truncate to
+            if (searchResult.validFrom() == timestamp) {
+                fullRecordsToTruncate++;
+            }
+
+            if (fullRecordsToTruncate == 0) {
+                // no records to remove; update nextTimestamp and return

Review Comment:
   This catches the situation in which the last record of the segment being 
inserted into has `validFrom=5` and `validTo=10`, and `insertAsLatest()` has 
been called with timestamp `8` (more generally, any timestamp larger than 5 and 
smaller than 10). In this situation, all we have to do to recover (before 
performing the insert) is to update `validTo` for the existing record from 10 
to 8.
   
   As noted in a comment above, I don't actually expect this situation to be 
hit but I think it is good to be defensive about these additional edge cases in 
this recovery logic, as it is difficult to anticipate the types of failures 
which may be encountered.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to