This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b8e606355b7 KAFKA-14491: [4/N] Improvements to segment value format 
for RocksDB versioned store (#13186)
b8e606355b7 is described below

commit b8e606355b71528aa438e80ffeb3042d0d586998
Author: Victoria Xia <[email protected]>
AuthorDate: Thu Feb 2 21:48:40 2023 -0800

    KAFKA-14491: [4/N] Improvements to segment value format for RocksDB 
versioned store (#13186)
    
    Reviewers: Matthias J. Sax <[email protected]>
---
 ...RocksDBVersionedStoreSegmentValueFormatter.java | 133 +++++++++++++--------
 ...sDBVersionedStoreSegmentValueFormatterTest.java |  12 +-
 2 files changed, 87 insertions(+), 58 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java
index ac6fbf3c1dc..7ea9cee354e 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java
@@ -22,16 +22,17 @@ import java.util.List;
 
 /**
  * Helper utility for managing the bytes layout of the value stored in 
segments of the {@link RocksDBVersionedStore}.
- * The value format is:
+ * All record versions for the same key (in the same segment) are collected 
into a single row.
+ * The value format for each row is:
  * <pre>
  *     <next_timestamp> + <min_timestamp> + <list of <timestamp, value_size>, 
reverse-sorted by timestamp> + <list of values, forward-sorted by timestamp>
  * </pre>
  * where:
  * <ul>
  * <li>{@code next_timestamp} is the validTo timestamp of the latest record 
version stored in this
- * segment,</li>
+ * row,</li>
  * <li>{@code min_timestamp} is the validFrom timestamp of the earliest record 
version stored
- * in this segment, and</li>
+ * in this row, and</li>
  * <li>Negative {@code value_size} is used to indicate that the value stored 
is a tombstone,
  * in order to distinguish from empty array which has {@code value_size} of 
zero. In practice,
  * {@code value_size} is always set to -1 for the tombstone case, though this 
need not be true
@@ -39,35 +40,54 @@ import java.util.List;
  * </ul>
  * <p>
  * Note that the value format above does not store the number of record 
versions contained in the
- * segment. It is not necessary to store this information separately because 
this information is
+ * row. It is not necessary to store this information separately because this 
information is
  * never required on its own. Record versions are always deserialized in 
order, and we can
  * determine when we have reached the end of the list based on whether the 
(validFrom) timestamp of
  * the record version equals the {@code min_timestamp}.
  * <p>
  * There is one edge case with regards to the segment value format described 
above, which is useful
  * to know for understanding the code in this file, but not relevant for 
callers of the class.
+ * Only continue reading if you want to understand details for reading the 
code in this file itself.
+ * <p>
  * In the typical case, all record (validFrom) timestamps and the {@code 
next_timestamp} of the
- * segment will form a strictly increasing sequence, i.e., it is not valid to 
have a record version
+ * segment row will form a strictly increasing sequence, i.e., it is not valid 
to have a record version
  * with validTo timestamp equal to (or less than) its validFrom timestamp. The 
one edge case /
- * exception is when the latest record version (for a particular key) is a 
tombstone, and the
- * segment in which this tombstone is to be stored contains currently no 
record versions.
- * This case will result in a "degenerate" segment containing the single 
tombstone, with both
+ * exception occurs when the store contains no record versions (for a 
particular key) and a tombstone
+ * is put. This case will result in a "degenerate" segment row containing the 
single tombstone, with both
  * {@code min_timestamp} and {@code next_timestamp} equal to the (validFrom) 
timestamp of the
- * tombstone. (It is valid to interpret this tombstone's validTo timestamp as 
being equal to its
- * validFrom timestamp, as querying for the latest record version as of a 
later timestamp will
- * correctly return that no record version is present.) Note also that after a 
"degenerate" segment
- * has formed, it's possible that the segment will remain degenerate even as 
newer record versions
- * are added. (For example, if additional puts happen with later timestamps 
such that those puts
- * only affect later segments, then the earlier degenerate segment will remain 
degenerate.)
+ * tombstone. (The reverse-sorted list of {@code <timestamp, value_size>} in 
the row format will
+ * still contain a single tombstone, as usual.)
+ * <p>
+ * The reason this happens is because tracking the tombstone is still required 
to maintain the correct
+ * history for the key in question (in case an out-of-order record inserted to 
the store later
+ * is deleted by this tombstone). As usual, this tombstone cannot be put into 
the latest value store,
+ * as the latest value store has no expiry mechanism and we risk tombstones 
accumulating indefinitely.
+ * What's different about this case is that usually we put the tombstone into 
a segment store as the
+ * validTo timestamp of another record (the previous contents of the latest 
value store), but there
+ * is no other record in this case. So, the tombstone is put into the segment 
as its own record,
+ * with a well-defined validFrom timestamp but an undefined validTo. Because 
we can't use a validTo
+ * of infinity to pick a segment, we instead use validTo as the tombstone's 
own timestamp as a
+ * surrogate to select a segment. This works because querying for the latest 
record version as of a
+ * later timestamp will correctly return that no record version is present
+ * <p>
+ * The same edge case also occurs if the store does not literally contain no 
existing record
+ * versions (for the particular key), so long as no record versions are 
relevant for the put process.
+ * Specifically, if the latest record version is already a tombstone and that 
tombstone is stored
+ * in an older segment than is relevant for the timestamp of the new tombstone 
being put, then the
+ * store "appears" empty during the put process for the new tombstone as the 
old segments are not
+ * searched for the newer put. Note also that after a "degenerate" segment row 
has formed, it's possible
+ * that the row will remain degenerate even as newer record versions are 
added. (For example,
+ * if additional puts happen with later timestamps such that those puts only 
affect later segments,
+ * then the earlier segment with the degenerate row will remain the same.)
  * <p>
- * Callers of this class need not concern themselves with this detail because 
all the exposed
- * methods function as expected, even in the degenerate segment case. All 
methods may still be
- * called, with the exception of {@link SegmentValue#find(long, boolean)} and 
those that depend
- * on it (i.e., {@link SegmentValue#updateRecord(long, byte[], int)} and
+ * Callers of this class need not concern themselves with this edge case / 
details of degenerate
+ * segment rows because all the exposed methods function as expected, even in 
the degenerate case.
+ * All methods may still be called, with the exception of {@link 
SegmentValue#find(long, boolean)}
+ * and those that depend on it (i.e., {@link SegmentValue#updateRecord(long, 
byte[], int)} and
  * {@link SegmentValue#insert(long, byte[], int)}). Missing support for 
calling these methods on
- * degenerate segments is not an issue because the same timestamp bounds 
restrictions required for
- * calling {@link SegmentValue#find(long, boolean)} on regular segments serve 
to prevent callers
- * from calling the method on degenerate segments as well.
+ * degenerate rows is not an issue because the same timestamp bounds 
restrictions required for
+ * calling {@link SegmentValue#find(long, boolean)} on regular segment rows 
serve to prevent callers
+ * from calling the method on degenerate rows as well.
  */
 final class RocksDBVersionedStoreSegmentValueFormatter {
     private static final int TIMESTAMP_SIZE = 8;
@@ -114,11 +134,10 @@ final class RocksDBVersionedStoreSegmentValueFormatter {
     interface SegmentValue {
 
         /**
-         * Finds the latest record in this segment with (validFrom) timestamp 
not exceeding the
+         * Finds the latest record in this segment row with (validFrom) 
timestamp not exceeding the
          * provided timestamp bound. This method requires that the provided 
timestamp bound exists
-         * in this segment, i.e., that the provided timestamp bound is at 
least minTimestamp and
-         * is smaller than nextTimestamp. As a result of this requirement, it 
is not permitted to
-         * call this method on degenerate segments.
+         * in this segment, i.e., that the provided timestamp bound is at 
least {@code minTimestamp}
+         * and is smaller than {@code nextTimestamp}.
          *
          * @param timestamp the timestamp to find
          * @param includeValue whether the value of the found record should be 
returned with the result
@@ -128,14 +147,13 @@ final class RocksDBVersionedStoreSegmentValueFormatter {
         SegmentSearchResult find(long timestamp, boolean includeValue);
 
         /**
-         * Inserts the provided record into the segment as the latest record 
in the segment.
-         * This operation is allowed even if the segment is degenerate.
+         * Inserts the provided record into the segment as the latest record 
in the segment row.
          * <p>
          * It is the caller's responsibility to ensure that this action is 
desirable. In the event
          * that the new record's (validFrom) timestamp is smaller than the 
current
-         * {@code nextTimestamp} of the segment, the operation will still be 
performed, and the
-         * segment's existing contents will be truncated to ensure consistency 
of timestamps within
-         * the segment. This truncation behavior helps reconcile 
inconsistencies between different
+         * {@code nextTimestamp} of the segment row, the operation will still 
be performed, and the
+         * row's existing contents will be truncated to ensure consistency of 
timestamps within
+         * the segment row. This truncation behavior helps reconcile 
inconsistencies between different
          * segments, or between a segment and the latest value store, of a
          * {@link RocksDBVersionedStore} instance.
          *
@@ -146,10 +164,10 @@ final class RocksDBVersionedStoreSegmentValueFormatter {
         void insertAsLatest(long validFrom, long validTo, byte[] value);
 
         /**
-         * Inserts the provided record into the segment as the earliest record 
in the segment.
-         * This operation is allowed even if the segment is degenerate. It is 
the caller's responsibility
-         * to ensure that this action is valid, i.e., that record's 
(validFrom) timestamp is smaller
-         * than the current {@code minTimestamp} of the segment.
+         * Inserts the provided record into the segment row as the earliest 
record in the row.
+         * It is the caller's responsibility to ensure that this action is 
valid, i.e.,
+         * that record's (validFrom) timestamp is smaller than the current 
{@code minTimestamp}
+         * of the segment row.
          *
          * @param timestamp the (validFrom) timestamp of the record to insert
          * @param value the value of the record to insert
@@ -157,10 +175,9 @@ final class RocksDBVersionedStoreSegmentValueFormatter {
         void insertAsEarliest(long timestamp, byte[] value);
 
         /**
-         * Inserts the provided record into the segment at the provided index. 
This operation
-         * requires that the segment is not degenerate, and that
-         * {@link SegmentValue#find(long, boolean)} has already been called in 
order to deserialize
-         * the relevant index (to insert into index n requires that index n-1 
has already been deserialized).
+         * Inserts the provided record into the segment row at the provided 
index. This operation
+         * requires that {@link SegmentValue#find(long, boolean)} has already 
been called in order to deserialize
+         * the relevant index (to insert into index n requires that index n 
has already been deserialized).
          * <p>
          * It is the caller's responsibility to ensure that this action makes 
sense, i.e., that the
          * insertion index is correct for the (validFrom) timestamp of the 
record being inserted.
@@ -168,8 +185,8 @@ final class RocksDBVersionedStoreSegmentValueFormatter {
          * @param timestamp the (validFrom) timestamp of the record to insert
          * @param value the value of the record to insert
          * @param index the index that the newly inserted record should occupy
-         * @throws IllegalArgumentException if the segment is degenerate, if 
the provided index is out of
-         *         bounds, or if {@code find()} has not been called to 
deserialize the relevant index.
+         * @throws IllegalArgumentException if the provided index is out of 
bounds, or if
+         *         {@code find()} has not been called to deserialize the 
relevant index.
          */
         void insert(long timestamp, byte[] value, int index);
 
@@ -177,10 +194,9 @@ final class RocksDBVersionedStoreSegmentValueFormatter {
          * Updates the record at the provided index with the provided value 
and (validFrom)
          * timestamp. This operation requires that {@link 
SegmentValue#find(long, boolean)} has
          * already been called in order to deserialize the relevant index 
(i.e., the one being updated).
-         * (As a result, it is not valid to call this method on a degenerate 
segment.)
          * <p>
          * It is the caller's responsibility to ensure that this action makes 
sense, i.e., that the
-         * updated (validFrom) timestamp does not violate timestamp order 
within the segment.
+         * updated (validFrom) timestamp does not violate timestamp order 
within the segment row.
          *
          * @param timestamp the updated record (validFrom) timestamp
          * @param value the updated record value
@@ -268,6 +284,8 @@ final class RocksDBVersionedStoreSegmentValueFormatter {
             if (timestamp >= nextTimestamp) {
                 throw new IllegalArgumentException("Timestamp is too large to 
be found in this segment.");
             }
+            // for degenerate segments, minTimestamp == nextTimestamp, and we 
will always throw an exception
+            // thus, we don't need to handle the degenerate case below
 
             long currNextTimestamp = nextTimestamp;
             long currTimestamp = -1L; // choose an invalid timestamp. if this 
is valid, this needs to be re-worked
@@ -319,7 +337,7 @@ final class RocksDBVersionedStoreSegmentValueFormatter {
         public void insertAsLatest(final long validFrom, final long validTo, 
final byte[] valueOrNull) {
             final ValueAndValueSize value = new ValueAndValueSize(valueOrNull);
 
-            if (nextTimestamp > validFrom) {
+            if (validFrom < nextTimestamp) {
                 // detected inconsistency edge case where older segment has 
[a,b) while newer store
                 // has [a,c), due to [b,c) having failed to write to newer 
store.
                 // remove entries from this store until the overlap is 
resolved.
@@ -327,21 +345,21 @@ final class RocksDBVersionedStoreSegmentValueFormatter {
                 throw new UnsupportedOperationException("case not yet 
implemented");
             }
 
-            if (nextTimestamp != validFrom) {
-                // move nextTimestamp into list as tombstone and add new 
record on top
+            if (nextTimestamp == validFrom) {
+                // nextTimestamp is moved into segment automatically as record 
is added on top
                 if (isDegenerate) {
-                    initializeWithRecord(new ValueAndValueSize(null), 
nextTimestamp, validFrom);
+                    initializeWithRecord(value, validFrom, validTo);
                 } else {
-                    insert(nextTimestamp, null, 0);
+                    doInsert(validFrom, value, 0);
                 }
-                doInsert(validFrom, value, 0);
             } else {
-                // nextTimestamp is moved into segment automatically as record 
is added on top
+                // move nextTimestamp into list as tombstone and add new 
record on top
                 if (isDegenerate) {
-                    initializeWithRecord(value, validFrom, validTo);
+                    initializeWithRecord(new ValueAndValueSize(null), 
nextTimestamp, validFrom);
                 } else {
-                    doInsert(validFrom, value, 0);
+                    doInsert(nextTimestamp, new ValueAndValueSize(null), 0);
                 }
+                doInsert(validFrom, value, 0);
             }
             // update nextTimestamp
             nextTimestamp = validTo;
@@ -362,13 +380,22 @@ final class RocksDBVersionedStoreSegmentValueFormatter {
 
         @Override
         public void insert(final long timestamp, final byte[] valueOrNull, 
final int index) {
+            // public-facing method contains stricter index requirement than 
the internal helper
+            // method doInsert() below
+            if (index > deserIndex) {
+                throw new IllegalArgumentException("Must invoke find() to 
deserialize record before insert() at specific index.");
+            }
+
             final ValueAndValueSize value = new ValueAndValueSize(valueOrNull);
             doInsert(timestamp, value, index);
         }
 
         private void doInsert(final long timestamp, final ValueAndValueSize 
value, final int index) {
-            if (isDegenerate || index > deserIndex + 1 || index < 0) {
-                throw new IllegalArgumentException("Must invoke find() to 
deserialize record before insert() at specific index.");
+            if (index > deserIndex + 1) {
+                throw new IllegalStateException("Must invoke find() to 
deserialize record before insert() at specific index.");
+            }
+            if (isDegenerate || index < 0) {
+                throw new IllegalStateException("Cannot insert at negative 
index or into degenerate segment.");
             }
 
             final boolean needsMinTsUpdate = isLastIndex(index - 1);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatterTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatterTest.java
index 9b08197a061..1311aac4078 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatterTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatterTest.java
@@ -95,7 +95,7 @@ public class RocksDBVersionedStoreSegmentValueFormatterTest {
         }
 
         // test inserting at each possible index
-        for (int insertIdx = 0; insertIdx <= testCase.records.size(); 
insertIdx++) {
+        for (int insertIdx = 0; insertIdx <= testCase.records.size() - 1; 
insertIdx++) {
             // build record to insert
             final long newRecordTimestamp;
             if (insertIdx == 0) {
@@ -106,7 +106,7 @@ public class RocksDBVersionedStoreSegmentValueFormatterTest 
{
                 }
             } else {
                 newRecordTimestamp = testCase.records.get(insertIdx - 
1).timestamp - 1;
-                if (newRecordTimestamp < 0 || (insertIdx < 
testCase.records.size() && newRecordTimestamp == 
testCase.records.get(insertIdx).timestamp)) {
+                if (newRecordTimestamp < 0 || (newRecordTimestamp == 
testCase.records.get(insertIdx).timestamp)) {
                     // cannot insert because timestamps of existing records 
are adjacent
                     continue;
                 }
@@ -116,9 +116,8 @@ public class RocksDBVersionedStoreSegmentValueFormatterTest 
{
             final SegmentValue segmentValue = 
buildSegmentWithInsertLatest(testCase);
 
             // insert() first requires a call to find()
-            if (insertIdx > 0) {
-                segmentValue.find(testCase.records.get(insertIdx - 
1).timestamp, false);
-            }
+            segmentValue.find(testCase.records.get(insertIdx).timestamp, 
false);
+
             segmentValue.insert(newRecord.timestamp, newRecord.value, 
insertIdx);
 
             // create expected results
@@ -176,6 +175,9 @@ public class RocksDBVersionedStoreSegmentValueFormatterTest 
{
 
         // build expected mapping from timestamp -> record
         final Map<Long, Integer> expectedRecordIndices = new HashMap<>();
+        // it's important that this for-loop iterates backwards through the 
record indices, so that
+        // when adjacent records have adjacent timestamps, then the record 
with the later timestamp
+        // (i.e., the earlier index) takes precedence
         for (int recordIdx = testCase.records.size() - 1; recordIdx >= 0; 
recordIdx--) {
             if (recordIdx < testCase.records.size() - 1) {
                 
expectedRecordIndices.put(testCase.records.get(recordIdx).timestamp - 1, 
recordIdx + 1);

Reply via email to