mjsax commented on code in PR #13364: URL: https://github.com/apache/kafka/pull/13364#discussion_r1149989283
########## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java: ########## @@ -341,8 +345,10 @@ public void insertAsLatest(final long validFrom, final long validTo, final byte[ // detected inconsistency edge case where older segment has [a,b) while newer store // has [a,c), due to [b,c) having failed to write to newer store. // remove entries from this store until the overlap is resolved. Review Comment: Think we can improve this existing comment? ``` // Detected inconsistency edge case for partial update: // We always insert base on `validTo`, and we would only have a "double write" // if an existing record moves from a newer segment into an older segment // (because its `validTo` timestamp changes due to the newly put record). // For this case, we first move the existing record from the newer to the older segment and // afterwards update the newer segment with the new record. // If only the first part, ie, moving the existing record into the older segment succeeded, // we end up with a "valid interval" overlap (and corrupted older segment) that we need to resolve: // - older segments latest record: [oldValue, old-validFrom, updated-validTo) // (this in incorrect because the dual write was not completed) // - newer segments oldest record: [oldValue, old-validFrom, old-validTo) // (this is correct, as it was never modified) // // We have the current older segment at hand, and need to truncate the partial write, // ie, remove [oldValue, old-validFrom, updated-validTo) from it. ``` Is this correct? ########## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java: ########## @@ -495,6 +501,41 @@ private boolean isLastIndex(final int index) { return unpackedReversedTimestampAndValueSizes.get(index).timestamp == minTimestamp; } + private void truncateRecordsToTimestamp(final long timestamp) { + if (timestamp <= minTimestamp) { + // delete everything in this current segment by replacing it with a degenerate segment + initializeWithRecord(new ValueAndValueSize(null), timestamp, timestamp); + return; + } + + final SegmentSearchResult searchResult = find(timestamp, false); + // all records with later timestamps should be removed + int fullRecordsToTruncate = searchResult.index(); + // additionally remove the current record as well, if its validFrom equals the + // timestamp to truncate to + if (searchResult.validFrom() == timestamp) { + fullRecordsToTruncate++; Review Comment: Not sure if I understand? `find()` is inclusive, right? It would return a record with `validFrom == timestamp` as search result? JavaDocs say "not exceeding": ``` Finds the latest record in this segment row with (validFrom) timestamp not exceeding the provided timestamp bound. ``` And the code inside `find()` says: ``` if (currTimestamp <= timestamp) { // found result ``` ########## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java: ########## @@ -495,6 +501,41 @@ private boolean isLastIndex(final int index) { return unpackedReversedTimestampAndValueSizes.get(index).timestamp == minTimestamp; } + private void truncateRecordsToTimestamp(final long timestamp) { + if (timestamp <= minTimestamp) { + // delete everything in this current segment by replacing it with a degenerate segment + initializeWithRecord(new ValueAndValueSize(null), timestamp, timestamp); + return; + } + + final SegmentSearchResult searchResult = find(timestamp, false); + // all records with later timestamps should be removed + int fullRecordsToTruncate = searchResult.index(); + // additionally remove the current record as well, if its validFrom equals the + // timestamp to truncate to + if (searchResult.validFrom() == timestamp) { + fullRecordsToTruncate++; + } + + if (fullRecordsToTruncate == 0) { + // no records to remove; update nextTimestamp and return Review Comment: Not sure if I understand this condition? ########## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java: ########## @@ -495,6 +501,41 @@ private boolean isLastIndex(final int index) { return unpackedReversedTimestampAndValueSizes.get(index).timestamp == minTimestamp; } + private void truncateRecordsToTimestamp(final long timestamp) { + if (timestamp <= minTimestamp) { + // delete everything in this current segment by replacing it with a degenerate segment + initializeWithRecord(new ValueAndValueSize(null), timestamp, timestamp); + return; + } + + final SegmentSearchResult searchResult = find(timestamp, false); + // all records with later timestamps should be removed + int fullRecordsToTruncate = searchResult.index(); + // additionally remove the current record as well, if its validFrom equals the + // timestamp to truncate to + if (searchResult.validFrom() == timestamp) { + fullRecordsToTruncate++; + } + + if (fullRecordsToTruncate == 0) { + // no records to remove; update nextTimestamp and return + nextTimestamp = timestamp; + ByteBuffer.wrap(segmentValue, 0, TIMESTAMP_SIZE).putLong(0, timestamp); + return; + } + + final int valuesLengthToRemove = cumulativeValueSizes.get(fullRecordsToTruncate - 1); + final int timestampAndValueSizesLengthToRemove = (TIMESTAMP_SIZE + VALUE_SIZE) * fullRecordsToTruncate; + final int newSegmentLength = segmentValue.length - valuesLengthToRemove - timestampAndValueSizesLengthToRemove; + segmentValue = ByteBuffer.allocate(newSegmentLength) + .putLong(timestamp) // update nextTimestamp as part of truncation Review Comment: I think this is "strictly" not correct (we lost the original `validTo` / `next_timestamp` what was overwritten by moving a record into this segment in the partial write); but it also does not matter, because we actually have a put() at hand that will update `next_timestamp` right after we finished the cleanup again, right? (not sure if we should document this with a comment?) ########## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java: ########## @@ -495,6 +501,41 @@ private boolean isLastIndex(final int index) { return unpackedReversedTimestampAndValueSizes.get(index).timestamp == minTimestamp; } + private void truncateRecordsToTimestamp(final long timestamp) { + if (timestamp <= minTimestamp) { + // delete everything in this current segment by replacing it with a degenerate segment + initializeWithRecord(new ValueAndValueSize(null), timestamp, timestamp); + return; + } + + final SegmentSearchResult searchResult = find(timestamp, false); + // all records with later timestamps should be removed + int fullRecordsToTruncate = searchResult.index(); Review Comment: Don't we know, that we only need to remove the oldest entry at index 0? For what case would we need to remove more than one entry/version ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org