vcrfxia commented on code in PR #13126:
URL: https://github.com/apache/kafka/pull/13126#discussion_r1093910815


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java:
##########
@@ -0,0 +1,518 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Helper utility for managing the bytes layout of the value stored in 
segments of the {@link RocksDBVersionedStore}.
+ * The value format is:
+ * <pre>
+ *     <next_timestamp> + <min_timestamp> + <list of <timestamp, value_size>, 
reverse-sorted by timestamp> + <list of values, forward-sorted by timestamp>
+ * </pre>
+ * where:
+ * <ul>
+ * <li>{@code next_timestamp} is the validTo timestamp of the latest record 
version stored in this
+ * segment,</li>
+ * <li>{@code min_timestamp} is the validFrom timestamp of the earliest record 
version stored
+ * in this segment, and</li>
+ * <li>Negative {@code value_size} is used to indicate that the value stored 
is a tombstone,
+ * in order to distinguish from empty array which has {@code value_size} of 
zero. In practice,
+ * {@code value_size} is always set to -1 for the tombstone case, though this 
need not be true
+ * in general.</li>
+ * </ul>
+ * <p>
+ * Note that the value format above does not store the number of record 
versions contained in the
+ * segment. It is not necessary to store this information separately because 
this information is
+ * never required on its own. Record versions are always deserialized in 
order, and we can
+ * determine when we have reached the end of the list based on whether the 
(validFrom) timestamp of
+ * the record version equals the {@code min_timestamp}.
+ * <p>
+ * There is one edge case with regards to the segment value format described 
above, which is useful
+ * to know for understanding the code in this file, but not relevant for 
callers of the class.
+ * In the typical case, all record (validFrom) timestamps and the {@code 
next_timestamp} of the
+ * segment will form a strictly increasing sequence, i.e., it is not valid to 
have a record version
+ * with validTo timestamp equal to (or less than) its validFrom timestamp. The 
one edge case /
+ * exception is when the latest record version (for a particular key) is a 
tombstone, and the
+ * segment in which this tombstone is to be stored contains currently no 
record versions.
+ * This case will result in a "degenerate" segment containing the single 
tombstone, with both
+ * {@code min_timestamp} and {@code next_timestamp} equal to the (validFrom) 
timestamp of the
+ * tombstone. (It is valid to interpret this tombstone's validTo timestamp as 
being equal to its
+ * validFrom timestamp, as querying for the latest record version as of a 
later timestamp will
+ * correctly return that no record version is present.) Note also that after a 
"degenerate" segment
+ * has formed, it's possible that the segment will remain degenerate even as 
newer record versions
+ * are added. (For example, if additional puts happen with later timestamps 
such that those puts
+ * only affect later segments, then the earlier degenerate segment will remain 
degenerate.)
+ * <p>
+ * Callers of this class need not concern themselves with this detail because 
all the exposed
+ * methods function as expected, even in the degenerate segment case. All 
methods may still be
+ * called, with the exception of {@link SegmentValue#find(long, boolean)} and 
those that depend

Review Comment:
   Clarified in https://github.com/apache/kafka/pull/13186. Hopefully it's 
clear now that users of the class really don't need to care; it's only to help 
readers understand the code in this class itself.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java:
##########
@@ -0,0 +1,518 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Helper utility for managing the bytes layout of the value stored in 
segments of the {@link RocksDBVersionedStore}.
+ * The value format is:
+ * <pre>
+ *     <next_timestamp> + <min_timestamp> + <list of <timestamp, value_size>, 
reverse-sorted by timestamp> + <list of values, forward-sorted by timestamp>
+ * </pre>
+ * where:
+ * <ul>
+ * <li>{@code next_timestamp} is the validTo timestamp of the latest record 
version stored in this
+ * segment,</li>
+ * <li>{@code min_timestamp} is the validFrom timestamp of the earliest record 
version stored
+ * in this segment, and</li>
+ * <li>Negative {@code value_size} is used to indicate that the value stored 
is a tombstone,
+ * in order to distinguish from empty array which has {@code value_size} of 
zero. In practice,
+ * {@code value_size} is always set to -1 for the tombstone case, though this 
need not be true
+ * in general.</li>
+ * </ul>
+ * <p>
+ * Note that the value format above does not store the number of record 
versions contained in the
+ * segment. It is not necessary to store this information separately because 
this information is
+ * never required on its own. Record versions are always deserialized in 
order, and we can
+ * determine when we have reached the end of the list based on whether the 
(validFrom) timestamp of
+ * the record version equals the {@code min_timestamp}.
+ * <p>
+ * There is one edge case with regards to the segment value format described 
above, which is useful
+ * to know for understanding the code in this file, but not relevant for 
callers of the class.
+ * In the typical case, all record (validFrom) timestamps and the {@code 
next_timestamp} of the
+ * segment will form a strictly increasing sequence, i.e., it is not valid to 
have a record version
+ * with validTo timestamp equal to (or less than) its validFrom timestamp. The 
one edge case /
+ * exception is when the latest record version (for a particular key) is a 
tombstone, and the
+ * segment in which this tombstone is to be stored contains currently no 
record versions.
+ * This case will result in a "degenerate" segment containing the single 
tombstone, with both
+ * {@code min_timestamp} and {@code next_timestamp} equal to the (validFrom) 
timestamp of the
+ * tombstone. (It is valid to interpret this tombstone's validTo timestamp as 
being equal to its
+ * validFrom timestamp, as querying for the latest record version as of a 
later timestamp will
+ * correctly return that no record version is present.) Note also that after a 
"degenerate" segment
+ * has formed, it's possible that the segment will remain degenerate even as 
newer record versions
+ * are added. (For example, if additional puts happen with later timestamps 
such that those puts
+ * only affect later segments, then the earlier degenerate segment will remain 
degenerate.)
+ * <p>
+ * Callers of this class need not concern themselves with this detail because 
all the exposed
+ * methods function as expected, even in the degenerate segment case. All 
methods may still be
+ * called, with the exception of {@link SegmentValue#find(long, boolean)} and 
those that depend
+ * on it (i.e., {@link SegmentValue#updateRecord(long, byte[], int)} and
+ * {@link SegmentValue#insert(long, byte[], int)}). Missing support for 
calling these methods on
+ * degenerate segments is not an issue because the same timestamp bounds 
restrictions required for
+ * calling {@link SegmentValue#find(long, boolean)} on regular segments serve 
to prevent callers
+ * from calling the method on degenerate segments as well.
+ */
+final class RocksDBVersionedStoreSegmentValueFormatter {
+    private static final int TIMESTAMP_SIZE = 8;
+    private static final int VALUE_SIZE = 4;
+
+    /**
+     * @return the validTo timestamp of the latest record in the provided 
segment
+     */
+    static long getNextTimestamp(final byte[] segmentValue) {
+        return ByteBuffer.wrap(segmentValue).getLong(0);
+    }
+
+    /**
+     * @return the (validFrom) timestamp of the earliest record in the 
provided segment.
+     */
+    static long getMinTimestamp(final byte[] segmentValue) {
+        return ByteBuffer.wrap(segmentValue).getLong(TIMESTAMP_SIZE);
+    }
+
+    /**
+     * @return the deserialized segment value
+     */
+    static SegmentValue deserialize(final byte[] segmentValue) {
+        return new PartiallyDeserializedSegmentValue(segmentValue);
+    }
+
+    /**
+     * Creates a new segment value that contains the provided record.
+     * <p>
+     * This method may also be used to create a "degenerate" segment with 
{@code null} value and
+     * {@code validFrom} timestamp equal to {@code validTo}. (For more on 
degenerate segments,
+     * see the main javadoc for this class.)
+     *
+     * @param value the record value
+     * @param validFrom the record's (validFrom) timestamp
+     * @param validTo the record's validTo timestamp
+     * @return the newly created segment value
+     */
+    static SegmentValue newSegmentValueWithRecord(
+        final byte[] value, final long validFrom, final long validTo) {
+        return new PartiallyDeserializedSegmentValue(value, validFrom, 
validTo);
+    }
+
+    interface SegmentValue {
+
+        /**
+         * Finds the latest record in this segment with (validFrom) timestamp 
not exceeding the
+         * provided timestamp bound. This method requires that the provided 
timestamp bound exists
+         * in this segment, i.e., that the provided timestamp bound is at 
least minTimestamp and
+         * is smaller than nextTimestamp. As a result of this requirement, it 
is not permitted to
+         * call this method on degenerate segments.

Review Comment:
   Fair point. I've removed these in https://github.com/apache/kafka/pull/13186.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to