vcrfxia commented on code in PR #13364:
URL: https://github.com/apache/kafka/pull/13364#discussion_r1151055264


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java:
##########
@@ -495,6 +501,41 @@ private boolean isLastIndex(final int index) {
             return unpackedReversedTimestampAndValueSizes.get(index).timestamp 
== minTimestamp;
         }
 
+        private void truncateRecordsToTimestamp(final long timestamp) {
+            if (timestamp <= minTimestamp) {
+                // delete everything in this current segment by replacing it 
with a degenerate segment
+                initializeWithRecord(new ValueAndValueSize(null), timestamp, 
timestamp);
+                return;
+            }
+
+            final SegmentSearchResult searchResult = find(timestamp, false);
+            // all records with later timestamps should be removed
+            int fullRecordsToTruncate = searchResult.index();

Review Comment:
   See above. I also can't think of situations in which we'd need to recovery 
by doing anything other than removing the current last record version in full, 
but I think it is good to be defensive. The current understanding that this is 
the only recovery scenario which is needed is based on a lot of factors (the 
fact that this recovery is only performed on `insertAsLatest()`, usage of 
`insertAsLatest()` from the store implementation, etc) which could potentially 
change in the future. I think it'd be good to future-proof this recovery logic 
in the event that something changes in the future, even though we do not expect 
that to be the case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to