vcrfxia commented on code in PR #13189: URL: https://github.com/apache/kafka/pull/13189#discussion_r1101924540
########## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java: ########## @@ -273,7 +290,41 @@ public void init(final StateStoreContext context, final StateStore root) { // VisibleForTesting void restoreBatch(final Collection<ConsumerRecord<byte[], byte[]>> records) { - throw new UnsupportedOperationException("not yet implemented"); + // advance stream time to the max timestamp in the batch + for (final ConsumerRecord<byte[], byte[]> record : records) { + observedStreamTime = Math.max(observedStreamTime, record.timestamp()); + } + + final VersionedStoreClient<?> restoreClient = restoreWriteBuffer.getClient(); Review Comment: The VersionedStoreSegment implementation used by the restore client (WriteBufferSegmentWithDbFallback) is currently private to the write buffer class, since the RocksDBVersionedStore doesn't care what the type is; all the outer class needs are the methods provided by the RocksDBVersionedStoreClient interface itself. So I've left the type out in order to avoid polluting the outer class with extra info it doesn't need. ########## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreRestoreWriteBuffer.java: ########## @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.TreeMap; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.state.internals.RocksDBVersionedStore.BatchWritingVersionedStoreClient; +import org.apache.kafka.streams.state.internals.RocksDBVersionedStore.VersionedStoreClient; +import org.apache.kafka.streams.state.internals.RocksDBVersionedStore.VersionedStoreSegment; +import org.rocksdb.RocksDBException; +import org.rocksdb.WriteBatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A write buffer for use in restoring a {@link RocksDBVersionedStore} from its changelog. This + * class exposes a {@link VersionedStoreClient} to put records into the write buffer, which may + * then be flushed to the store via {@link WriteBatch}es, for improved write efficiency during + * restoration. + * <p> + * The structure of the internals of this write buffer mirrors the structure of the + * {@code RocksDBVersionedStore} itself, i.e., data for the latest value store and each of the + * segment stores is buffered in a separate object -- specifically, a map. + */ +public class RocksDBVersionedStoreRestoreWriteBuffer { + + private static final Logger log = LoggerFactory.getLogger(RocksDBVersionedStoreRestoreWriteBuffer.class); + + // write buffer for latest value store. value type is Optional in order to track tombstones + // which must be written to the underlying store. + private final Map<Bytes, Optional<byte[]>> latestValueWriteBuffer; + // map from segment id to write buffer. segments are stored in reverse-sorted order, + // so getReverseSegments() is more efficient + private final TreeMap<Long, WriteBufferSegmentWithDbFallback> segmentsWriteBuffer; + private final BatchWritingVersionedStoreClient<LogicalKeyValueSegment> dbClient; + private final RocksDBVersionedStoreRestoreClient restoreClient; + + /** + * Creates a new write buffer. + * @param dbClient client for reading from and writing to the underlying persistent store + */ + RocksDBVersionedStoreRestoreWriteBuffer(final BatchWritingVersionedStoreClient<LogicalKeyValueSegment> dbClient) { + this.dbClient = Objects.requireNonNull(dbClient); + + this.latestValueWriteBuffer = new HashMap<>(); + // store in reverse-sorted order, to make getReverseSegments() more efficient + this.segmentsWriteBuffer = new TreeMap<>((x, y) -> Long.compare(y, x)); + this.restoreClient = new RocksDBVersionedStoreRestoreClient(); + } + + /** + * @return client for writing to (and reading from) the write buffer + */ + VersionedStoreClient<?> getClient() { Review Comment: See above. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org