vcrfxia commented on code in PR #13364:
URL: https://github.com/apache/kafka/pull/13364#discussion_r1151049820


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java:
##########
@@ -341,8 +345,10 @@ public void insertAsLatest(final long validFrom, final 
long validTo, final byte[
                 // detected inconsistency edge case where older segment has 
[a,b) while newer store
                 // has [a,c), due to [b,c) having failed to write to newer 
store.
                 // remove entries from this store until the overlap is 
resolved.

Review Comment:
   Yep, that's spot on. I will update the comment with your suggestions.
   
   After a partial write failure is encountered, when the store next resumes 
processing then the store should first reprocess records which it has already 
processed previously (if EOS is not enabled), including the one during which 
the failure occurred, before processing any newly seen records. In this case, 
we would only expect a single record to be truncated (and to be truncated in 
full) as part of the recovery process -- as you've outlined above. The recovery 
code is more general, though, in that it also supports truncating multiple 
records and/or partial records, in case my above understanding about the types 
of failures which can occur is not accurate or changes in the future.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java:
##########
@@ -495,6 +501,41 @@ private boolean isLastIndex(final int index) {
             return unpackedReversedTimestampAndValueSizes.get(index).timestamp 
== minTimestamp;
         }
 
+        private void truncateRecordsToTimestamp(final long timestamp) {
+            if (timestamp <= minTimestamp) {
+                // delete everything in this current segment by replacing it 
with a degenerate segment
+                initializeWithRecord(new ValueAndValueSize(null), timestamp, 
timestamp);
+                return;
+            }
+
+            final SegmentSearchResult searchResult = find(timestamp, false);
+            // all records with later timestamps should be removed
+            int fullRecordsToTruncate = searchResult.index();
+            // additionally remove the current record as well, if its 
validFrom equals the
+            // timestamp to truncate to
+            if (searchResult.validFrom() == timestamp) {
+                fullRecordsToTruncate++;

Review Comment:
   Yes, `find()` is inclusive. That's also what "not exceeding" means (if the 
timestamps are equal, then neither exceeds the other), so I don't think there's 
a contradiction here? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to