vcrfxia commented on code in PR #13364: URL: https://github.com/apache/kafka/pull/13364#discussion_r1153888408
########## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java: ########## @@ -495,6 +501,41 @@ private boolean isLastIndex(final int index) { return unpackedReversedTimestampAndValueSizes.get(index).timestamp == minTimestamp; } + private void truncateRecordsToTimestamp(final long timestamp) { + if (timestamp <= minTimestamp) { + // delete everything in this current segment by replacing it with a degenerate segment + initializeWithRecord(new ValueAndValueSize(null), timestamp, timestamp); + return; + } + + final SegmentSearchResult searchResult = find(timestamp, false); + // all records with later timestamps should be removed + int fullRecordsToTruncate = searchResult.index(); Review Comment: Thanks for the discussion, everyone! I chatted with Matthias and we've decided to not throw IllegalStateException or a recoverable exception, and instead log an extra warn message if more (or less) than a single record is truncated as part of this recovery logic, which would indicate either nondeterministic processing or that something has changed in the code to violate our current understanding that only one record should be truncated for recovery. As Guozhang noted it's quite difficult to write tests to ensure that all the various existing properties that guarantee safety of this truncation today are preserved going forward as there are a bunch of them which all mutually interact. We have pretty thorough unit test coverage on the outer classes already. The only extra testing I can think to add would be tests which explicitly simulate partial failed updates and verify recovery. Might be overkill but I can do that in a follow-up PR if we think it's worth it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org