This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b8e606355b7 KAFKA-14491: [4/N] Improvements to segment value format
for RocksDB versioned store (#13186)
b8e606355b7 is described below
commit b8e606355b71528aa438e80ffeb3042d0d586998
Author: Victoria Xia <[email protected]>
AuthorDate: Thu Feb 2 21:48:40 2023 -0800
KAFKA-14491: [4/N] Improvements to segment value format for RocksDB
versioned store (#13186)
Reviewers: Matthias J. Sax <[email protected]>
---
...RocksDBVersionedStoreSegmentValueFormatter.java | 133 +++++++++++++--------
...sDBVersionedStoreSegmentValueFormatterTest.java | 12 +-
2 files changed, 87 insertions(+), 58 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java
index ac6fbf3c1dc..7ea9cee354e 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java
@@ -22,16 +22,17 @@ import java.util.List;
/**
* Helper utility for managing the bytes layout of the value stored in
segments of the {@link RocksDBVersionedStore}.
- * The value format is:
+ * All record versions for the same key (in the same segment) are collected
into a single row.
+ * The value format for each row is:
* <pre>
* <next_timestamp> + <min_timestamp> + <list of <timestamp, value_size>,
reverse-sorted by timestamp> + <list of values, forward-sorted by timestamp>
* </pre>
* where:
* <ul>
* <li>{@code next_timestamp} is the validTo timestamp of the latest record
version stored in this
- * segment,</li>
+ * row,</li>
* <li>{@code min_timestamp} is the validFrom timestamp of the earliest record
version stored
- * in this segment, and</li>
+ * in this row, and</li>
* <li>Negative {@code value_size} is used to indicate that the value stored
is a tombstone,
* in order to distinguish from empty array which has {@code value_size} of
zero. In practice,
* {@code value_size} is always set to -1 for the tombstone case, though this
need not be true
@@ -39,35 +40,54 @@ import java.util.List;
* </ul>
* <p>
* Note that the value format above does not store the number of record
versions contained in the
- * segment. It is not necessary to store this information separately because
this information is
+ * row. It is not necessary to store this information separately because this
information is
* never required on its own. Record versions are always deserialized in
order, and we can
* determine when we have reached the end of the list based on whether the
(validFrom) timestamp of
* the record version equals the {@code min_timestamp}.
* <p>
* There is one edge case with regards to the segment value format described
above, which is useful
* to know for understanding the code in this file, but not relevant for
callers of the class.
+ * Only continue reading if you want to understand details for reading the
code in this file itself.
+ * <p>
* In the typical case, all record (validFrom) timestamps and the {@code
next_timestamp} of the
- * segment will form a strictly increasing sequence, i.e., it is not valid to
have a record version
+ * segment row will form a strictly increasing sequence, i.e., it is not valid
to have a record version
* with validTo timestamp equal to (or less than) its validFrom timestamp. The
one edge case /
- * exception is when the latest record version (for a particular key) is a
tombstone, and the
- * segment in which this tombstone is to be stored contains currently no
record versions.
- * This case will result in a "degenerate" segment containing the single
tombstone, with both
+ * exception occurs when the store contains no record versions (for a
particular key) and a tombstone
+ * is put. This case will result in a "degenerate" segment row containing the
single tombstone, with both
* {@code min_timestamp} and {@code next_timestamp} equal to the (validFrom)
timestamp of the
- * tombstone. (It is valid to interpret this tombstone's validTo timestamp as
being equal to its
- * validFrom timestamp, as querying for the latest record version as of a
later timestamp will
- * correctly return that no record version is present.) Note also that after a
"degenerate" segment
- * has formed, it's possible that the segment will remain degenerate even as
newer record versions
- * are added. (For example, if additional puts happen with later timestamps
such that those puts
- * only affect later segments, then the earlier degenerate segment will remain
degenerate.)
+ * tombstone. (The reverse-sorted list of {@code <timestamp, value_size>} in
the row format will
+ * still contain a single tombstone, as usual.)
+ * <p>
+ * The reason this happens is because tracking the tombstone is still required
to maintain the correct
+ * history for the key in question (in case an out-of-order record inserted to
the store later
+ * is deleted by this tombstone). As usual, this tombstone cannot be put into
the latest value store,
+ * as the latest value store has no expiry mechanism and we risk tombstones
accumulating indefinitely.
+ * What's different about this case is that usually we put the tombstone into
a segment store as the
+ * validTo timestamp of another record (the previous contents of the latest
value store), but there
+ * is no other record in this case. So, the tombstone is put into the segment
as its own record,
+ * with a well-defined validFrom timestamp but an undefined validTo. Because
we can't use a validTo
+ * of infinity to pick a segment, we instead use validTo as the tombstone's
own timestamp as a
+ * surrogate to select a segment. This works because querying for the latest
record version as of a
+ * later timestamp will correctly return that no record version is present
+ * <p>
+ * The same edge case also occurs if the store does not literally contain no
existing record
+ * versions (for the particular key), so long as no record versions are
relevant for the put process.
+ * Specifically, if the latest record version is already a tombstone and that
tombstone is stored
+ * in an older segment than is relevant for the timestamp of the new tombstone
being put, then the
+ * store "appears" empty during the put process for the new tombstone as the
old segments are not
+ * searched for the newer put. Note also that after a "degenerate" segment row
has formed, it's possible
+ * that the row will remain degenerate even as newer record versions are
added. (For example,
+ * if additional puts happen with later timestamps such that those puts only
affect later segments,
+ * then the earlier segment with the degenerate row will remain the same.)
* <p>
- * Callers of this class need not concern themselves with this detail because
all the exposed
- * methods function as expected, even in the degenerate segment case. All
methods may still be
- * called, with the exception of {@link SegmentValue#find(long, boolean)} and
those that depend
- * on it (i.e., {@link SegmentValue#updateRecord(long, byte[], int)} and
+ * Callers of this class need not concern themselves with this edge case /
details of degenerate
+ * segment rows because all the exposed methods function as expected, even in
the degenerate case.
+ * All methods may still be called, with the exception of {@link
SegmentValue#find(long, boolean)}
+ * and those that depend on it (i.e., {@link SegmentValue#updateRecord(long,
byte[], int)} and
* {@link SegmentValue#insert(long, byte[], int)}). Missing support for
calling these methods on
- * degenerate segments is not an issue because the same timestamp bounds
restrictions required for
- * calling {@link SegmentValue#find(long, boolean)} on regular segments serve
to prevent callers
- * from calling the method on degenerate segments as well.
+ * degenerate rows is not an issue because the same timestamp bounds
restrictions required for
+ * calling {@link SegmentValue#find(long, boolean)} on regular segment rows
serve to prevent callers
+ * from calling the method on degenerate rows as well.
*/
final class RocksDBVersionedStoreSegmentValueFormatter {
private static final int TIMESTAMP_SIZE = 8;
@@ -114,11 +134,10 @@ final class RocksDBVersionedStoreSegmentValueFormatter {
interface SegmentValue {
/**
- * Finds the latest record in this segment with (validFrom) timestamp
not exceeding the
+ * Finds the latest record in this segment row with (validFrom)
timestamp not exceeding the
* provided timestamp bound. This method requires that the provided
timestamp bound exists
- * in this segment, i.e., that the provided timestamp bound is at
least minTimestamp and
- * is smaller than nextTimestamp. As a result of this requirement, it
is not permitted to
- * call this method on degenerate segments.
+ * in this segment, i.e., that the provided timestamp bound is at
least {@code minTimestamp}
+ * and is smaller than {@code nextTimestamp}.
*
* @param timestamp the timestamp to find
* @param includeValue whether the value of the found record should be
returned with the result
@@ -128,14 +147,13 @@ final class RocksDBVersionedStoreSegmentValueFormatter {
SegmentSearchResult find(long timestamp, boolean includeValue);
/**
- * Inserts the provided record into the segment as the latest record
in the segment.
- * This operation is allowed even if the segment is degenerate.
+ * Inserts the provided record into the segment as the latest record
in the segment row.
* <p>
* It is the caller's responsibility to ensure that this action is
desirable. In the event
* that the new record's (validFrom) timestamp is smaller than the
current
- * {@code nextTimestamp} of the segment, the operation will still be
performed, and the
- * segment's existing contents will be truncated to ensure consistency
of timestamps within
- * the segment. This truncation behavior helps reconcile
inconsistencies between different
+ * {@code nextTimestamp} of the segment row, the operation will still
be performed, and the
+ * row's existing contents will be truncated to ensure consistency of
timestamps within
+ * the segment row. This truncation behavior helps reconcile
inconsistencies between different
* segments, or between a segment and the latest value store, of a
* {@link RocksDBVersionedStore} instance.
*
@@ -146,10 +164,10 @@ final class RocksDBVersionedStoreSegmentValueFormatter {
void insertAsLatest(long validFrom, long validTo, byte[] value);
/**
- * Inserts the provided record into the segment as the earliest record
in the segment.
- * This operation is allowed even if the segment is degenerate. It is
the caller's responsibility
- * to ensure that this action is valid, i.e., that record's
(validFrom) timestamp is smaller
- * than the current {@code minTimestamp} of the segment.
+ * Inserts the provided record into the segment row as the earliest
record in the row.
+ * It is the caller's responsibility to ensure that this action is
valid, i.e.,
+ * that record's (validFrom) timestamp is smaller than the current
{@code minTimestamp}
+ * of the segment row.
*
* @param timestamp the (validFrom) timestamp of the record to insert
* @param value the value of the record to insert
@@ -157,10 +175,9 @@ final class RocksDBVersionedStoreSegmentValueFormatter {
void insertAsEarliest(long timestamp, byte[] value);
/**
- * Inserts the provided record into the segment at the provided index.
This operation
- * requires that the segment is not degenerate, and that
- * {@link SegmentValue#find(long, boolean)} has already been called in
order to deserialize
- * the relevant index (to insert into index n requires that index n-1
has already been deserialized).
+ * Inserts the provided record into the segment row at the provided
index. This operation
+ * requires that {@link SegmentValue#find(long, boolean)} has already
been called in order to deserialize
+ * the relevant index (to insert into index n requires that index n
has already been deserialized).
* <p>
* It is the caller's responsibility to ensure that this action makes
sense, i.e., that the
* insertion index is correct for the (validFrom) timestamp of the
record being inserted.
@@ -168,8 +185,8 @@ final class RocksDBVersionedStoreSegmentValueFormatter {
* @param timestamp the (validFrom) timestamp of the record to insert
* @param value the value of the record to insert
* @param index the index that the newly inserted record should occupy
- * @throws IllegalArgumentException if the segment is degenerate, if
the provided index is out of
- * bounds, or if {@code find()} has not been called to
deserialize the relevant index.
+ * @throws IllegalArgumentException if the provided index is out of
bounds, or if
+ * {@code find()} has not been called to deserialize the
relevant index.
*/
void insert(long timestamp, byte[] value, int index);
@@ -177,10 +194,9 @@ final class RocksDBVersionedStoreSegmentValueFormatter {
* Updates the record at the provided index with the provided value
and (validFrom)
* timestamp. This operation requires that {@link
SegmentValue#find(long, boolean)} has
* already been called in order to deserialize the relevant index
(i.e., the one being updated).
- * (As a result, it is not valid to call this method on a degenerate
segment.)
* <p>
* It is the caller's responsibility to ensure that this action makes
sense, i.e., that the
- * updated (validFrom) timestamp does not violate timestamp order
within the segment.
+ * updated (validFrom) timestamp does not violate timestamp order
within the segment row.
*
* @param timestamp the updated record (validFrom) timestamp
* @param value the updated record value
@@ -268,6 +284,8 @@ final class RocksDBVersionedStoreSegmentValueFormatter {
if (timestamp >= nextTimestamp) {
throw new IllegalArgumentException("Timestamp is too large to
be found in this segment.");
}
+ // for degenerate segments, minTimestamp == nextTimestamp, and we
will always throw an exception
+ // thus, we don't need to handle the degenerate case below
long currNextTimestamp = nextTimestamp;
long currTimestamp = -1L; // choose an invalid timestamp. if this
is valid, this needs to be re-worked
@@ -319,7 +337,7 @@ final class RocksDBVersionedStoreSegmentValueFormatter {
public void insertAsLatest(final long validFrom, final long validTo,
final byte[] valueOrNull) {
final ValueAndValueSize value = new ValueAndValueSize(valueOrNull);
- if (nextTimestamp > validFrom) {
+ if (validFrom < nextTimestamp) {
// detected inconsistency edge case where older segment has
[a,b) while newer store
// has [a,c), due to [b,c) having failed to write to newer
store.
// remove entries from this store until the overlap is
resolved.
@@ -327,21 +345,21 @@ final class RocksDBVersionedStoreSegmentValueFormatter {
throw new UnsupportedOperationException("case not yet
implemented");
}
- if (nextTimestamp != validFrom) {
- // move nextTimestamp into list as tombstone and add new
record on top
+ if (nextTimestamp == validFrom) {
+ // nextTimestamp is moved into segment automatically as record
is added on top
if (isDegenerate) {
- initializeWithRecord(new ValueAndValueSize(null),
nextTimestamp, validFrom);
+ initializeWithRecord(value, validFrom, validTo);
} else {
- insert(nextTimestamp, null, 0);
+ doInsert(validFrom, value, 0);
}
- doInsert(validFrom, value, 0);
} else {
- // nextTimestamp is moved into segment automatically as record
is added on top
+ // move nextTimestamp into list as tombstone and add new
record on top
if (isDegenerate) {
- initializeWithRecord(value, validFrom, validTo);
+ initializeWithRecord(new ValueAndValueSize(null),
nextTimestamp, validFrom);
} else {
- doInsert(validFrom, value, 0);
+ doInsert(nextTimestamp, new ValueAndValueSize(null), 0);
}
+ doInsert(validFrom, value, 0);
}
// update nextTimestamp
nextTimestamp = validTo;
@@ -362,13 +380,22 @@ final class RocksDBVersionedStoreSegmentValueFormatter {
@Override
public void insert(final long timestamp, final byte[] valueOrNull,
final int index) {
+ // public-facing method contains stricter index requirement than
the internal helper
+ // method doInsert() below
+ if (index > deserIndex) {
+ throw new IllegalArgumentException("Must invoke find() to
deserialize record before insert() at specific index.");
+ }
+
final ValueAndValueSize value = new ValueAndValueSize(valueOrNull);
doInsert(timestamp, value, index);
}
private void doInsert(final long timestamp, final ValueAndValueSize
value, final int index) {
- if (isDegenerate || index > deserIndex + 1 || index < 0) {
- throw new IllegalArgumentException("Must invoke find() to
deserialize record before insert() at specific index.");
+ if (index > deserIndex + 1) {
+ throw new IllegalStateException("Must invoke find() to
deserialize record before insert() at specific index.");
+ }
+ if (isDegenerate || index < 0) {
+ throw new IllegalStateException("Cannot insert at negative
index or into degenerate segment.");
}
final boolean needsMinTsUpdate = isLastIndex(index - 1);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatterTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatterTest.java
index 9b08197a061..1311aac4078 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatterTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatterTest.java
@@ -95,7 +95,7 @@ public class RocksDBVersionedStoreSegmentValueFormatterTest {
}
// test inserting at each possible index
- for (int insertIdx = 0; insertIdx <= testCase.records.size();
insertIdx++) {
+ for (int insertIdx = 0; insertIdx <= testCase.records.size() - 1;
insertIdx++) {
// build record to insert
final long newRecordTimestamp;
if (insertIdx == 0) {
@@ -106,7 +106,7 @@ public class RocksDBVersionedStoreSegmentValueFormatterTest
{
}
} else {
newRecordTimestamp = testCase.records.get(insertIdx -
1).timestamp - 1;
- if (newRecordTimestamp < 0 || (insertIdx <
testCase.records.size() && newRecordTimestamp ==
testCase.records.get(insertIdx).timestamp)) {
+ if (newRecordTimestamp < 0 || (newRecordTimestamp ==
testCase.records.get(insertIdx).timestamp)) {
// cannot insert because timestamps of existing records
are adjacent
continue;
}
@@ -116,9 +116,8 @@ public class RocksDBVersionedStoreSegmentValueFormatterTest
{
final SegmentValue segmentValue =
buildSegmentWithInsertLatest(testCase);
// insert() first requires a call to find()
- if (insertIdx > 0) {
- segmentValue.find(testCase.records.get(insertIdx -
1).timestamp, false);
- }
+ segmentValue.find(testCase.records.get(insertIdx).timestamp,
false);
+
segmentValue.insert(newRecord.timestamp, newRecord.value,
insertIdx);
// create expected results
@@ -176,6 +175,9 @@ public class RocksDBVersionedStoreSegmentValueFormatterTest
{
// build expected mapping from timestamp -> record
final Map<Long, Integer> expectedRecordIndices = new HashMap<>();
+ // it's important that this for-loop iterates backwards through the
record indices, so that
+ // when adjacent records have adjacent timestamps, then the record
with the later timestamp
+ // (i.e., the earlier index) takes precedence
for (int recordIdx = testCase.records.size() - 1; recordIdx >= 0;
recordIdx--) {
if (recordIdx < testCase.records.size() - 1) {
expectedRecordIndices.put(testCase.records.get(recordIdx).timestamp - 1,
recordIdx + 1);