mjsax commented on code in PR #13364:
URL: https://github.com/apache/kafka/pull/13364#discussion_r1152737975


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java:
##########
@@ -495,6 +501,41 @@ private boolean isLastIndex(final int index) {
             return unpackedReversedTimestampAndValueSizes.get(index).timestamp 
== minTimestamp;
         }
 
+        private void truncateRecordsToTimestamp(final long timestamp) {
+            if (timestamp <= minTimestamp) {
+                // delete everything in this current segment by replacing it 
with a degenerate segment
+                initializeWithRecord(new ValueAndValueSize(null), timestamp, 
timestamp);
+                return;
+            }
+
+            final SegmentSearchResult searchResult = find(timestamp, false);
+            // all records with later timestamps should be removed
+            int fullRecordsToTruncate = searchResult.index();
+            // additionally remove the current record as well, if its 
validFrom equals the
+            // timestamp to truncate to
+            if (searchResult.validFrom() == timestamp) {
+                fullRecordsToTruncate++;
+            }
+
+            if (fullRecordsToTruncate == 0) {
+                // no records to remove; update nextTimestamp and return

Review Comment:
   But if we handle this case, don't we incorrectly drop `[10,15)`? The 
intention if my example was to say that `[5,10)` was the partial write, but 
`[10,15)` was a clean write later (due to re-ordering) that did not detect the 
corruption and went through cleanly, and now inserting `8` would incorrectly 
purge `[10,15)`?
   
   Or maybe this case could never happen? -- Just want to double check.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to