mjsax commented on code in PR #13364:
URL: https://github.com/apache/kafka/pull/13364#discussion_r1149989283


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java:
##########
@@ -341,8 +345,10 @@ public void insertAsLatest(final long validFrom, final 
long validTo, final byte[
                 // detected inconsistency edge case where older segment has 
[a,b) while newer store
                 // has [a,c), due to [b,c) having failed to write to newer 
store.
                 // remove entries from this store until the overlap is 
resolved.

Review Comment:
   Think we can improve this existing comment?
   
   ```
   // Detected inconsistency edge case for partial update:
   // We always insert base on `validTo`, and we would only have a "double 
write"
   // if an existing record moves from a newer segment into an older segment
   // (because its `validTo` timestamp changes due to the newly put record).
   // For this case, we first move the existing record from the newer to the 
older segment and
   // afterwards update the newer segment with the new record.
   // If only the first part, ie, moving the existing record into the older 
segment succeeded,
   // we end up with a "valid interval" overlap (and corrupted older segment) 
that we need to resolve:
   //  - older segments latest record: [oldValue, old-validFrom, 
updated-validTo)
   //    (this in incorrect because the dual write was not completed)
   //  - newer segments oldest record: [oldValue, old-validFrom, old-validTo)
   //    (this is correct, as it was never modified)
   //
   // We have the current older segment at hand, and need to truncate the 
partial write,
   // ie, remove [oldValue, old-validFrom, updated-validTo) from it.
   ```
   
   Is this correct?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java:
##########
@@ -495,6 +501,41 @@ private boolean isLastIndex(final int index) {
             return unpackedReversedTimestampAndValueSizes.get(index).timestamp 
== minTimestamp;
         }
 
+        private void truncateRecordsToTimestamp(final long timestamp) {
+            if (timestamp <= minTimestamp) {
+                // delete everything in this current segment by replacing it 
with a degenerate segment
+                initializeWithRecord(new ValueAndValueSize(null), timestamp, 
timestamp);
+                return;
+            }
+
+            final SegmentSearchResult searchResult = find(timestamp, false);
+            // all records with later timestamps should be removed
+            int fullRecordsToTruncate = searchResult.index();
+            // additionally remove the current record as well, if its 
validFrom equals the
+            // timestamp to truncate to
+            if (searchResult.validFrom() == timestamp) {
+                fullRecordsToTruncate++;

Review Comment:
   Not sure if I understand? `find()` is inclusive, right? It would return a 
record with `validFrom == timestamp` as search result? JavaDocs say "not 
exceeding":
   ```
   Finds the latest record in this segment row with (validFrom) timestamp not 
exceeding the
   provided timestamp bound.
   ```
   And the code inside `find()` says:
   ```
   if (currTimestamp <= timestamp) {
       // found result
   ```



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java:
##########
@@ -495,6 +501,41 @@ private boolean isLastIndex(final int index) {
             return unpackedReversedTimestampAndValueSizes.get(index).timestamp 
== minTimestamp;
         }
 
+        private void truncateRecordsToTimestamp(final long timestamp) {
+            if (timestamp <= minTimestamp) {
+                // delete everything in this current segment by replacing it 
with a degenerate segment
+                initializeWithRecord(new ValueAndValueSize(null), timestamp, 
timestamp);
+                return;
+            }
+
+            final SegmentSearchResult searchResult = find(timestamp, false);
+            // all records with later timestamps should be removed
+            int fullRecordsToTruncate = searchResult.index();
+            // additionally remove the current record as well, if its 
validFrom equals the
+            // timestamp to truncate to
+            if (searchResult.validFrom() == timestamp) {
+                fullRecordsToTruncate++;
+            }
+
+            if (fullRecordsToTruncate == 0) {
+                // no records to remove; update nextTimestamp and return

Review Comment:
   Not sure if I understand this condition?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java:
##########
@@ -495,6 +501,41 @@ private boolean isLastIndex(final int index) {
             return unpackedReversedTimestampAndValueSizes.get(index).timestamp 
== minTimestamp;
         }
 
+        private void truncateRecordsToTimestamp(final long timestamp) {
+            if (timestamp <= minTimestamp) {
+                // delete everything in this current segment by replacing it 
with a degenerate segment
+                initializeWithRecord(new ValueAndValueSize(null), timestamp, 
timestamp);
+                return;
+            }
+
+            final SegmentSearchResult searchResult = find(timestamp, false);
+            // all records with later timestamps should be removed
+            int fullRecordsToTruncate = searchResult.index();
+            // additionally remove the current record as well, if its 
validFrom equals the
+            // timestamp to truncate to
+            if (searchResult.validFrom() == timestamp) {
+                fullRecordsToTruncate++;
+            }
+
+            if (fullRecordsToTruncate == 0) {
+                // no records to remove; update nextTimestamp and return
+                nextTimestamp = timestamp;
+                ByteBuffer.wrap(segmentValue, 0, TIMESTAMP_SIZE).putLong(0, 
timestamp);
+                return;
+            }
+
+            final int valuesLengthToRemove = 
cumulativeValueSizes.get(fullRecordsToTruncate - 1);
+            final int timestampAndValueSizesLengthToRemove = (TIMESTAMP_SIZE + 
VALUE_SIZE) * fullRecordsToTruncate;
+            final int newSegmentLength = segmentValue.length - 
valuesLengthToRemove - timestampAndValueSizesLengthToRemove;
+            segmentValue = ByteBuffer.allocate(newSegmentLength)
+                .putLong(timestamp) // update nextTimestamp as part of 
truncation

Review Comment:
   I think this is "strictly" not correct (we lost the original `validTo` / 
`next_timestamp` what was overwritten by moving a record into this segment in 
the partial write); but it also does not matter, because we actually have a 
put() at hand that will update `next_timestamp` right after we finished the 
cleanup again, right? (not sure if we should document this with a comment?)



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java:
##########
@@ -495,6 +501,41 @@ private boolean isLastIndex(final int index) {
             return unpackedReversedTimestampAndValueSizes.get(index).timestamp 
== minTimestamp;
         }
 
+        private void truncateRecordsToTimestamp(final long timestamp) {
+            if (timestamp <= minTimestamp) {
+                // delete everything in this current segment by replacing it 
with a degenerate segment
+                initializeWithRecord(new ValueAndValueSize(null), timestamp, 
timestamp);
+                return;
+            }
+
+            final SegmentSearchResult searchResult = find(timestamp, false);
+            // all records with later timestamps should be removed
+            int fullRecordsToTruncate = searchResult.index();

Review Comment:
   Don't we know, that we only need to remove the oldest entry at index 0? For 
what case would we need to remove more than one entry/version ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to