mjsax commented on code in PR #13364: URL: https://github.com/apache/kafka/pull/13364#discussion_r1151299746
########## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java: ########## @@ -495,6 +501,41 @@ private boolean isLastIndex(final int index) { return unpackedReversedTimestampAndValueSizes.get(index).timestamp == minTimestamp; } + private void truncateRecordsToTimestamp(final long timestamp) { + if (timestamp <= minTimestamp) { + // delete everything in this current segment by replacing it with a degenerate segment + initializeWithRecord(new ValueAndValueSize(null), timestamp, timestamp); + return; + } + + final SegmentSearchResult searchResult = find(timestamp, false); + // all records with later timestamps should be removed + int fullRecordsToTruncate = searchResult.index(); + // additionally remove the current record as well, if its validFrom equals the + // timestamp to truncate to + if (searchResult.validFrom() == timestamp) { + fullRecordsToTruncate++; + } + + if (fullRecordsToTruncate == 0) { + // no records to remove; update nextTimestamp and return + nextTimestamp = timestamp; + ByteBuffer.wrap(segmentValue, 0, TIMESTAMP_SIZE).putLong(0, timestamp); + return; + } + + final int valuesLengthToRemove = cumulativeValueSizes.get(fullRecordsToTruncate - 1); + final int timestampAndValueSizesLengthToRemove = (TIMESTAMP_SIZE + VALUE_SIZE) * fullRecordsToTruncate; + final int newSegmentLength = segmentValue.length - valuesLengthToRemove - timestampAndValueSizesLengthToRemove; + segmentValue = ByteBuffer.allocate(newSegmentLength) + .putLong(timestamp) // update nextTimestamp as part of truncation Review Comment: Yeah -- not 100% sure -- guess my idea was just to avoid potential confusion if somebody read this code in the future (but also always tricky to not confuse readers with too much detail). Maybe something like: ``` // update nextTimestamp as part of truncation -- this a surrogate value that just fixed inconsistency without being actually correct; but we have a pending put() in place that will set the correct value later ``` Just an idea. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org