vcrfxia commented on code in PR #13188:
URL: https://github.com/apache/kafka/pull/13188#discussion_r1099321787


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##########
@@ -0,0 +1,792 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
+import 
org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
+import 
org.apache.kafka.streams.processor.internals.StoreToProcessorContextAdapter;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.state.VersionedKeyValueStore;
+import org.apache.kafka.streams.state.VersionedRecord;
+import 
org.apache.kafka.streams.state.internals.RocksDBVersionedStoreSegmentValueFormatter.SegmentValue;
+import 
org.apache.kafka.streams.state.internals.RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult;
+import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A persistent, versioned key-value store based on RocksDB.
+ * <p>
+ * This store implementation consists of a "latest value store" and "segment 
stores." The latest
+ * record version for each key is stored in the latest value store, while 
older record versions
+ * are stored in the segment stores. Conceptually, each record version has two 
associated
+ * timestamps:
+ * <ul>
+ *     <li>a {@code validFrom} timestamp. This timestamp is explicitly 
associated with the record
+ *     as part of the {@link VersionedKeyValueStore#put(Object, Object, 
long)}} call to the store;
+ *     i.e., this is the record's timestamp.</li>
+ *     <li>a {@code validTo} timestamp. This is the timestamp of the next 
record (or deletion)
+ *     associated with the same key, and is implicitly associated with the 
record. This timestamp
+ *     can change as new records are inserted into the store.</li>
+ * </ul>
+ * The validity interval of a record is from validFrom (inclusive) to validTo 
(exclusive), and
+ * can change as new record versions are inserted into the store (and validTo 
changes as a result).
+ * <p>
+ * Old record versions are stored in segment stores according to their validTo 
timestamps. This
+ * allows for efficient expiry of old record versions, as entire segments can 
be dropped from the
+ * store at a time, once the records contained in the segment are no longer 
relevant based on the
+ * store's history retention (for an explanation of "history retention", see
+ * {@link VersionedKeyValueStore}). Multiple record versions (for the same 
key) within a single
+ * segment are stored together using the format specified in {@link 
RocksDBVersionedStoreSegmentValueFormatter}.
+ */
+public class RocksDBVersionedStore implements VersionedKeyValueStore<Bytes, 
byte[]> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBVersionedStore.class);
+    // a marker to indicate that no record version has yet been found as part 
of an ongoing
+    // put() procedure. any value which is not a valid record timestamp will 
do.
+    private static final long SENTINEL_TIMESTAMP = Long.MIN_VALUE;
+
+    private final String name;
+    private final long historyRetention;
+    private final RocksDBMetricsRecorder metricsRecorder;
+
+    private final RocksDBStore latestValueStore;
+    private final LogicalKeyValueSegments segmentStores;
+    private final LatestValueSchema latestValueSchema;
+    private final SegmentValueSchema segmentValueSchema;
+    private final VersionedStoreClient<LogicalKeyValueSegment> 
versionedStoreClient;
+
+    private ProcessorContext context;
+    private StateStoreContext stateStoreContext;
+    private Sensor expiredRecordSensor;
+    private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+    private Position position;
+    private OffsetCheckpoint positionCheckpoint;
+    private volatile boolean open;
+
+    RocksDBVersionedStore(final String name, final String metricsScope, final 
long historyRetention, final long segmentInterval) {
+        this.name = name;
+        this.historyRetention = historyRetention;
+        this.metricsRecorder = new RocksDBMetricsRecorder(metricsScope, name);
+        this.latestValueStore = new RocksDBStore(lvsName(name), name, 
metricsRecorder);
+        this.segmentStores = new LogicalKeyValueSegments(segmentsName(name), 
name, historyRetention, segmentInterval, metricsRecorder);
+        this.latestValueSchema = new LatestValueSchema();
+        this.segmentValueSchema = new SegmentValueSchema();
+        this.versionedStoreClient = new RocksDBVersionedStoreClient();
+    }
+
+    @Override
+    public void put(final Bytes key, final byte[] value, final long timestamp) 
{
+
+        doPut(
+            latestValueSchema,
+            segmentValueSchema,
+            versionedStoreClient,
+            Optional.of(expiredRecordSensor),
+            context,
+            observedStreamTime,
+            historyRetention,
+            key,
+            value,
+            timestamp
+        );
+
+        observedStreamTime = Math.max(observedStreamTime, timestamp);
+    }
+
+    @Override
+    public VersionedRecord<byte[]> delete(final Bytes key, final long 
timestamp) {
+        final VersionedRecord<byte[]> existingRecord = get(key, timestamp);
+        put(key, null, timestamp);
+        return existingRecord;
+    }
+
+    @Override
+    public VersionedRecord<byte[]> get(final Bytes key) {
+        // latest value (if present) is guaranteed to be in the latest value 
store
+        final byte[] latestValue = latestValueStore.get(key);
+        if (latestValue != null) {
+            return new VersionedRecord<>(
+                latestValueSchema.getValue(latestValue),
+                latestValueSchema.getTimestamp(latestValue)
+            );
+        } else {
+            return null;
+        }
+    }
+
+    @Override
+    public VersionedRecord<byte[]> get(final Bytes key, final long 
asOfTimestamp) {
+
+        if (asOfTimestamp < observedStreamTime - historyRetention) {
+            // history retention has elapsed. return null for predictability, 
even if data
+            // is still present in store.
+            return null;
+        }
+
+        // first check the latest value store
+        final byte[] latestValue = latestValueStore.get(key);
+        if (latestValue != null) {
+            final long latestTimestamp = 
latestValueSchema.getTimestamp(latestValue);
+            if (latestTimestamp <= asOfTimestamp) {
+                return new 
VersionedRecord<>(latestValueSchema.getValue(latestValue), latestTimestamp);
+            }
+        }
+
+        // check segment stores
+        final List<LogicalKeyValueSegment> segments = 
segmentStores.segments(asOfTimestamp, Long.MAX_VALUE, false);
+        for (final LogicalKeyValueSegment segment : segments) {
+            final byte[] segmentValue = segment.get(key);
+            if (segmentValue != null) {
+                final long nextTs = 
segmentValueSchema.getNextTimestamp(segmentValue);
+                if (nextTs <= asOfTimestamp) {
+                    // this segment contains no data for the queried 
timestamp, so earlier segments
+                    // cannot either
+                    return null;
+                }
+
+                if (segmentValueSchema.getMinTimestamp(segmentValue) > 
asOfTimestamp) {
+                    // the segment only contains data for after the queried 
timestamp. skip and
+                    // continue the search to earlier segments. as an 
optimization, this code
+                    // could be updated to skip forward to the segment 
containing the minTimestamp
+                    // in the if-condition above.
+                    continue;
+                }
+
+                // the desired result is contained in this segment
+                final SegmentSearchResult searchResult =
+                    
segmentValueSchema.deserialize(segmentValue).find(asOfTimestamp, true);
+                if (searchResult.value() != null) {
+                    return new VersionedRecord<>(searchResult.value(), 
searchResult.validFrom());
+                } else {
+                    return null;
+                }
+            }
+        }
+
+        // checked all segments and no results found
+        return null;
+    }
+
+    @Override
+    public String name() {
+        return name;
+    }
+
+    @Override
+    public void flush() {
+        // order shouldn't matter since failure to flush is a fatal exception
+        segmentStores.flush();
+        latestValueStore.flush();
+    }
+
+    @Override
+    public void close() {
+        open = false;
+
+        // close latest value store first so that calls to get() immediately 
begin to fail with
+        // store not open, as all calls to get() first get() from latest value 
store
+        latestValueStore.close();
+        segmentStores.close();
+    }
+
+    @Override
+    public boolean persistent() {
+        return true;
+    }
+
+    @Override
+    public boolean isOpen() {
+        return open;
+    }
+
+    @Override
+    public Position getPosition() {
+        return position;
+    }
+
+    @Deprecated
+    @Override
+    public void init(final ProcessorContext context, final StateStore root) {
+        this.context = context;
+
+        final StreamsMetricsImpl metrics = 
ProcessorContextUtils.getMetricsImpl(context);
+        final String threadId = Thread.currentThread().getName();
+        final String taskName = context.taskId().toString();
+
+        expiredRecordSensor = TaskMetrics.droppedRecordsSensor(
+            threadId,
+            taskName,
+            metrics
+        );
+
+        metricsRecorder.init(ProcessorContextUtils.getMetricsImpl(context), 
context.taskId());
+
+        latestValueStore.openDB(context.appConfigs(), context.stateDir());
+        segmentStores.openExisting(context, observedStreamTime);
+
+        final File positionCheckpointFile = new File(context.stateDir(), 
name() + ".position");
+        this.positionCheckpoint = new OffsetCheckpoint(positionCheckpointFile);
+        this.position = 
StoreQueryUtils.readPositionFromCheckpoint(positionCheckpoint);
+
+        // register and possibly restore the state from the logs
+        stateStoreContext.register(
+            root,
+            (RecordBatchingStateRestoreCallback) 
RocksDBVersionedStore.this::restoreBatch,
+            () -> StoreQueryUtils.checkpointPosition(positionCheckpoint, 
position)
+        );
+
+        open = true;
+    }
+
+    @Override
+    public void init(final StateStoreContext context, final StateStore root) {
+        this.stateStoreContext = context;
+        init(StoreToProcessorContextAdapter.adapt(context), root);
+    }
+
+    // VisibleForTesting
+    void restoreBatch(final Collection<ConsumerRecord<byte[], byte[]>> 
records) {
+        throw new UnsupportedOperationException("not yet implemented");
+    }
+
+    /**
+     * Generic interface for segment stores. See {@link VersionedStoreClient} 
for use.
+     */
+    interface VersionedStoreSegment {
+
+        /**
+         * @return segment id
+         */
+        long id();
+
+        void put(Bytes key, byte[] value);
+
+        byte[] get(Bytes key);
+    }
+
+    /**
+     * Extracts all operations required for writing to the versioned store (via
+     * {@link #put(Bytes, byte[], long)}) into a generic client interface, so 
that the same
+     * {@code put(...)} logic can be shared during regular store operation and 
during restore.
+     *
+     * @param <T> the segment type used by this client
+     */
+    interface VersionedStoreClient<T extends VersionedStoreSegment> {
+
+        /**
+         * @return the contents of the latest value store, for the given key
+         */
+        byte[] getLatestValue(Bytes key);
+
+        /**
+         * Puts the provided key and value into the latest value store.
+         */
+        void putLatestValue(Bytes key, byte[] value);
+
+        /**
+         * Deletes the existing value (if any) from the latest value store, 
for the given key.
+         */
+        void deleteLatestValue(Bytes key);
+
+        /**
+         * @return the segment with the provided id, or {@code null} if the 
segment is expired
+         */
+        T getOrCreateSegmentIfLive(long segmentId, ProcessorContext context, 
long streamTime);
+
+        /**
+         * @return all segments in the store which contain timestamps at least 
the provided
+         *         timestamp bound, in reverse order by segment id (and time), 
i.e., such that
+         *         the most recent segment is first
+         */
+        List<T> getReverseSegments(long timestampFrom);
+
+        /**
+         * @return the segment id associated with the provided timestamp
+         */
+        long segmentIdForTimestamp(long timestamp);
+    }
+
+    /**
+     * Client for writing into (and reading from) this persistent {@link 
RocksDBVersionedStore}.
+     */
+    private class RocksDBVersionedStoreClient implements 
VersionedStoreClient<LogicalKeyValueSegment> {
+
+        @Override
+        public byte[] getLatestValue(final Bytes key) {
+            return latestValueStore.get(key);
+        }
+
+        @Override
+        public void putLatestValue(final Bytes key, final byte[] value) {
+            latestValueStore.put(key, value);
+        }
+
+        @Override
+        public void deleteLatestValue(final Bytes key) {
+            latestValueStore.delete(key);
+        }
+
+        @Override
+        public LogicalKeyValueSegment getOrCreateSegmentIfLive(final long 
segmentId, final ProcessorContext context, final long streamTime) {
+            return segmentStores.getOrCreateSegmentIfLive(segmentId, context, 
streamTime);
+        }
+
+        @Override
+        public List<LogicalKeyValueSegment> getReverseSegments(final long 
timestampFrom) {
+            return segmentStores.segments(timestampFrom, Long.MAX_VALUE, 
false);
+        }
+
+        @Override
+        public long segmentIdForTimestamp(final long timestamp) {
+            return segmentStores.segmentId(timestamp);
+        }
+    }
+
+    private static <T extends VersionedStoreSegment> void doPut(
+        final LatestValueSchema latestValueSchema,
+        final SegmentValueSchema segmentValueSchema,
+        final VersionedStoreClient<T> versionedStoreClient,
+        final Optional<Sensor> expiredRecordSensor,
+        final ProcessorContext context,
+        final long observedStreamTime,
+        final long historyRetention,
+        final Bytes key,
+        final byte[] value,
+        final long timestamp
+    ) {
+        // track the smallest timestamp seen so far that is larger than 
insertion timestamp.
+        // this timestamp determines, based on all segments searched so far, 
which segment the
+        // new record should be inserted into. the starting "sentinel 
timestamp" represents
+        // that the segment should be inserted into the latest value store.
+        long foundTs = SENTINEL_TIMESTAMP;
+
+        // check latest value store
+        PutStatus status = putLatestValueStore(
+            latestValueSchema,
+            segmentValueSchema,
+            versionedStoreClient,
+            context,
+            observedStreamTime,
+            key,
+            value,
+            timestamp,
+            foundTs
+        );
+        if (status.isComplete) {
+            return;
+        } else {
+            foundTs = status.foundTs;
+        }
+
+        // continue search in segments
+        status = putSegments(
+            segmentValueSchema,
+            versionedStoreClient,
+            expiredRecordSensor,
+            context,
+            observedStreamTime,
+            historyRetention,
+            key,
+            value,
+            timestamp,
+            foundTs
+        );
+        if (status.isComplete) {
+            return;
+        } else {
+            foundTs = status.foundTs;
+        }
+
+        // ran out of segments to search. insert into segment as specified by 
foundTs
+        putFallThrough(

Review Comment:
   Renamed to `finishPut()` for now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to