vcrfxia commented on code in PR #13364: URL: https://github.com/apache/kafka/pull/13364#discussion_r1151055264
########## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java: ########## @@ -495,6 +501,41 @@ private boolean isLastIndex(final int index) { return unpackedReversedTimestampAndValueSizes.get(index).timestamp == minTimestamp; } + private void truncateRecordsToTimestamp(final long timestamp) { + if (timestamp <= minTimestamp) { + // delete everything in this current segment by replacing it with a degenerate segment + initializeWithRecord(new ValueAndValueSize(null), timestamp, timestamp); + return; + } + + final SegmentSearchResult searchResult = find(timestamp, false); + // all records with later timestamps should be removed + int fullRecordsToTruncate = searchResult.index(); Review Comment: See above. I also can't think of situations in which we'd need to recovery by doing anything other than removing the current last record version in full, but I think it is good to be defensive. The current understanding that this is the only recovery scenario which is needed is based on a lot of factors (the fact that this recovery is only performed on `insertAsLatest()`, usage of `insertAsLatest()` from the store implementation, etc) which could potentially change in the future. I think it'd be good to future-proof this recovery logic in the event that something changes in the future, even though we do not expect that to be the case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org