vcrfxia commented on code in PR #13189:
URL: https://github.com/apache/kafka/pull/13189#discussion_r1101924540


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##########
@@ -273,7 +290,41 @@ public void init(final StateStoreContext context, final 
StateStore root) {
 
     // VisibleForTesting
     void restoreBatch(final Collection<ConsumerRecord<byte[], byte[]>> 
records) {
-        throw new UnsupportedOperationException("not yet implemented");
+        // advance stream time to the max timestamp in the batch
+        for (final ConsumerRecord<byte[], byte[]> record : records) {
+            observedStreamTime = Math.max(observedStreamTime, 
record.timestamp());
+        }
+
+        final VersionedStoreClient<?> restoreClient = 
restoreWriteBuffer.getClient();

Review Comment:
   The VersionedStoreSegment implementation used by the restore client 
(WriteBufferSegmentWithDbFallback) is currently private to the write buffer 
class, since the RocksDBVersionedStore doesn't care what the type is; all the 
outer class needs are the methods provided by the RocksDBVersionedStoreClient 
interface itself. So I've left the type out in order to avoid polluting the 
outer class with extra info it doesn't need.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreRestoreWriteBuffer.java:
##########
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import 
org.apache.kafka.streams.state.internals.RocksDBVersionedStore.BatchWritingVersionedStoreClient;
+import 
org.apache.kafka.streams.state.internals.RocksDBVersionedStore.VersionedStoreClient;
+import 
org.apache.kafka.streams.state.internals.RocksDBVersionedStore.VersionedStoreSegment;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A write buffer for use in restoring a {@link RocksDBVersionedStore} from 
its changelog. This
+ * class exposes a {@link VersionedStoreClient} to put records into the write 
buffer, which may
+ * then be flushed to the store via {@link WriteBatch}es, for improved write 
efficiency during
+ * restoration.
+ * <p>
+ * The structure of the internals of this write buffer mirrors the structure 
of the
+ * {@code RocksDBVersionedStore} itself, i.e., data for the latest value store 
and each of the
+ * segment stores is buffered in a separate object -- specifically, a map.
+ */
+public class RocksDBVersionedStoreRestoreWriteBuffer {
+
+    private static final Logger log = 
LoggerFactory.getLogger(RocksDBVersionedStoreRestoreWriteBuffer.class);
+
+    // write buffer for latest value store. value type is Optional in order to 
track tombstones
+    // which must be written to the underlying store.
+    private final Map<Bytes, Optional<byte[]>> latestValueWriteBuffer;
+    // map from segment id to write buffer. segments are stored in 
reverse-sorted order,
+    // so getReverseSegments() is more efficient
+    private final TreeMap<Long, WriteBufferSegmentWithDbFallback> 
segmentsWriteBuffer;
+    private final BatchWritingVersionedStoreClient<LogicalKeyValueSegment> 
dbClient;
+    private final RocksDBVersionedStoreRestoreClient restoreClient;
+
+    /**
+     * Creates a new write buffer.
+     * @param dbClient client for reading from and writing to the underlying 
persistent store
+     */
+    RocksDBVersionedStoreRestoreWriteBuffer(final 
BatchWritingVersionedStoreClient<LogicalKeyValueSegment> dbClient) {
+        this.dbClient = Objects.requireNonNull(dbClient);
+
+        this.latestValueWriteBuffer = new HashMap<>();
+        // store in reverse-sorted order, to make getReverseSegments() more 
efficient
+        this.segmentsWriteBuffer = new TreeMap<>((x, y) -> Long.compare(y, x));
+        this.restoreClient = new RocksDBVersionedStoreRestoreClient();
+    }
+
+    /**
+     * @return client for writing to (and reading from) the write buffer
+     */
+    VersionedStoreClient<?> getClient() {

Review Comment:
   See above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to