This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 528a777df66 KAFKA-14491: [6/N] Support restoring RocksDB versioned store from changelog (#13189) 528a777df66 is described below commit 528a777df66ab1e66544ad2a8fb3d020ff229bff Author: Victoria Xia <victoria....@confluent.io> AuthorDate: Mon Feb 13 17:06:44 2023 -0800 KAFKA-14491: [6/N] Support restoring RocksDB versioned store from changelog (#13189) This PR builds on the new RocksDB-based versioned store implementation (see KIP-889) by adding code for restoring from changelog. The changelog topic format is the same as for regular timestamped key-value stores: record keys, values, and timestamps are stored in the Kafka message key, value, and timestamp, respectively. The code for actually writing to this changelog will come in a follow-up PR. Reviewers: Matthias J. Sax <matth...@confluent.io> --- .../state/internals/RocksDBVersionedStore.java | 90 ++++++- .../RocksDBVersionedStoreRestoreWriteBuffer.java | 269 +++++++++++++++++++++ .../state/internals/RocksDBVersionedStoreTest.java | 150 ++++++++++++ 3 files changed, 502 insertions(+), 7 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java index e40fc36e5bb..f56ceccc0b7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java @@ -16,6 +16,8 @@ */ package org.apache.kafka.streams.state.internals; +import static org.apache.kafka.streams.StreamsConfig.InternalConfig.IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED; + import java.io.File; import java.nio.ByteBuffer; import java.util.Collection; @@ -24,9 +26,13 @@ import java.util.Optional; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.internals.ChangelogRecordDeserializationHelper; import org.apache.kafka.streams.processor.internals.ProcessorContextUtils; import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback; import org.apache.kafka.streams.processor.internals.StoreToProcessorContextAdapter; @@ -38,6 +44,8 @@ import org.apache.kafka.streams.state.VersionedRecord; import org.apache.kafka.streams.state.internals.RocksDBVersionedStoreSegmentValueFormatter.SegmentValue; import org.apache.kafka.streams.state.internals.RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult; import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder; +import org.rocksdb.RocksDBException; +import org.rocksdb.WriteBatch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -78,12 +86,14 @@ public class RocksDBVersionedStore implements VersionedKeyValueStore<Bytes, byte private final RocksDBStore latestValueStore; private final LogicalKeyValueSegments segmentStores; - private final VersionedStoreClient<LogicalKeyValueSegment> versionedStoreClient; + private final RocksDBVersionedStoreClient versionedStoreClient; + private final RocksDBVersionedStoreRestoreWriteBuffer restoreWriteBuffer; private ProcessorContext context; private StateStoreContext stateStoreContext; private Sensor expiredRecordSensor; private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; + private boolean consistencyEnabled = false; private Position position; private OffsetCheckpoint positionCheckpoint; private volatile boolean open; @@ -95,6 +105,7 @@ public class RocksDBVersionedStore implements VersionedKeyValueStore<Bytes, byte this.latestValueStore = new RocksDBStore(latestValueStoreName(name), name, metricsRecorder); this.segmentStores = new LogicalKeyValueSegments(segmentsStoreName(name), name, historyRetention, segmentInterval, metricsRecorder); this.versionedStoreClient = new RocksDBVersionedStoreClient(); + this.restoreWriteBuffer = new RocksDBVersionedStoreRestoreWriteBuffer(versionedStoreClient); } @Override @@ -121,11 +132,11 @@ public class RocksDBVersionedStore implements VersionedKeyValueStore<Bytes, byte @Override public VersionedRecord<byte[]> get(final Bytes key) { // latest value (if present) is guaranteed to be in the latest value store - final byte[] latestValue = latestValueStore.get(key); - if (latestValue != null) { + final byte[] rawLatestValueAndTimestamp = latestValueStore.get(key); + if (rawLatestValueAndTimestamp != null) { return new VersionedRecord<>( - LatestValueFormatter.getValue(latestValue), - LatestValueFormatter.getTimestamp(latestValue) + LatestValueFormatter.getValue(rawLatestValueAndTimestamp), + LatestValueFormatter.getTimestamp(rawLatestValueAndTimestamp) ); } else { return null; @@ -256,6 +267,12 @@ public class RocksDBVersionedStore implements VersionedKeyValueStore<Bytes, byte ); open = true; + + consistencyEnabled = StreamsConfig.InternalConfig.getBoolean( + context.appConfigs(), + IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED, + false + ); } @Override @@ -266,7 +283,41 @@ public class RocksDBVersionedStore implements VersionedKeyValueStore<Bytes, byte // VisibleForTesting void restoreBatch(final Collection<ConsumerRecord<byte[], byte[]>> records) { - throw new UnsupportedOperationException("not yet implemented"); + // advance stream time to the max timestamp in the batch + for (final ConsumerRecord<byte[], byte[]> record : records) { + observedStreamTime = Math.max(observedStreamTime, record.timestamp()); + } + + final VersionedStoreClient<?> restoreClient = restoreWriteBuffer.getClient(); + + // note: there is increased risk for hitting an out-of-memory during this restore loop, + // compared to for non-versioned key-value stores, because this versioned store + // implementation stores multiple records (for the same key) together in a single RocksDB + // "segment" entry -- restoring a single changelog entry could require loading multiple + // records into memory. how high this memory amplification will be is very much dependent + // on the specific workload and the value of the "segment interval" parameter. + for (final ConsumerRecord<byte[], byte[]> record : records) { + ChangelogRecordDeserializationHelper.applyChecksAndUpdatePosition( + record, + consistencyEnabled, + position + ); + + // put records to write buffer + doPut( + restoreClient, + Optional.empty(), + new Bytes(record.key()), + record.value(), + record.timestamp() + ); + } + + try { + restoreWriteBuffer.flush(); + } catch (final RocksDBException e) { + throw new ProcessorStateException("Error restoring batch to store " + name, e); + } } /** @@ -329,7 +380,7 @@ public class RocksDBVersionedStore implements VersionedKeyValueStore<Bytes, byte /** * Client for writing into (and reading from) this persistent {@link RocksDBVersionedStore}. */ - private class RocksDBVersionedStoreClient implements VersionedStoreClient<LogicalKeyValueSegment> { + class RocksDBVersionedStoreClient implements VersionedStoreClient<LogicalKeyValueSegment> { @Override public byte[] getLatestValue(final Bytes key) { @@ -360,6 +411,31 @@ public class RocksDBVersionedStore implements VersionedKeyValueStore<Bytes, byte public long segmentIdForTimestamp(final long timestamp) { return segmentStores.segmentId(timestamp); } + + /** + * Adds the provided record into the provided batch, in preparation for writing into the + * latest value store. + * <p> + * Together with {@link #writeLatestValues(WriteBatch)}, this method supports batch writes + * into the latest value store. + * + * @throws RocksDBException if a failure occurs adding the record to the {@link WriteBatch} + */ + public void addToLatestValueBatch(final KeyValue<byte[], byte[]> record, final WriteBatch batch) throws RocksDBException { + latestValueStore.addToBatch(record, batch); + } + + /** + * Writes the provided batch of records into the latest value store. + * <p> + * Together with {@link #addToLatestValueBatch(KeyValue, WriteBatch)}, this method supports + * batch writes into the latest value store. + * + * @throws RocksDBException if a failure occurs while writing the {@link WriteBatch} to the store + */ + public void writeLatestValues(final WriteBatch batch) throws RocksDBException { + latestValueStore.write(batch); + } } private <T extends VersionedStoreSegment> void doPut( diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreRestoreWriteBuffer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreRestoreWriteBuffer.java new file mode 100644 index 00000000000..e50913d8ad1 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreRestoreWriteBuffer.java @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.TreeMap; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.state.internals.RocksDBVersionedStore.RocksDBVersionedStoreClient; +import org.apache.kafka.streams.state.internals.RocksDBVersionedStore.VersionedStoreClient; +import org.apache.kafka.streams.state.internals.RocksDBVersionedStore.VersionedStoreSegment; +import org.rocksdb.RocksDBException; +import org.rocksdb.WriteBatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A write buffer for use in restoring a {@link RocksDBVersionedStore} from its changelog. This + * class exposes a {@link VersionedStoreClient} to put records into the write buffer, which may + * then be flushed to the store via {@link WriteBatch}es, for improved write efficiency during + * restoration. + * <p> + * The structure of the internals of this write buffer mirrors the structure of the + * {@code RocksDBVersionedStore} itself, i.e., data for the latest value store and each of the + * segment stores is buffered in a separate object -- specifically, a map. + */ +public class RocksDBVersionedStoreRestoreWriteBuffer { + + private static final Logger log = LoggerFactory.getLogger(RocksDBVersionedStoreRestoreWriteBuffer.class); + + // write buffer for latest value store. value type is Optional in order to track tombstones + // which must be written to the underlying store. + private final Map<Bytes, Optional<byte[]>> latestValueWriteBuffer; + // map from segment id to write buffer. segments are stored in reverse-sorted order, + // so getReverseSegments() is more efficient + private final TreeMap<Long, WriteBufferSegmentWithDbFallback> segmentsWriteBuffer; + private final RocksDBVersionedStoreClient dbClient; + private final RocksDBVersionedStoreRestoreClient restoreClient; + + /** + * Creates a new write buffer. + * @param dbClient client for reading from and writing to the underlying persistent store + */ + RocksDBVersionedStoreRestoreWriteBuffer(final RocksDBVersionedStoreClient dbClient) { + this.dbClient = Objects.requireNonNull(dbClient); + + this.latestValueWriteBuffer = new HashMap<>(); + // store in reverse-sorted order, to make getReverseSegments() more efficient + this.segmentsWriteBuffer = new TreeMap<>((x, y) -> Long.compare(y, x)); + this.restoreClient = new RocksDBVersionedStoreRestoreClient(); + } + + /** + * @return client for writing to (and reading from) the write buffer + */ + VersionedStoreClient<?> getClient() { + return restoreClient; + } + + /** + * Flushes the contents of the write buffer into the persistent store, and clears the write + * buffer in the process. + * @throws RocksDBException if a failure occurs adding to or writing a {@link WriteBatch} + */ + void flush() throws RocksDBException { + + // flush segments first, as this is consistent with the store always writing to + // older segments/stores before later ones + try (final WriteBatch segmentsBatch = new WriteBatch()) { + final List<WriteBufferSegmentWithDbFallback> allSegments = restoreClient.getReverseSegments(Long.MIN_VALUE); + if (allSegments.size() > 0) { + // collect entries into write batch + for (final WriteBufferSegmentWithDbFallback bufferSegment : allSegments) { + final LogicalKeyValueSegment dbSegment = bufferSegment.dbSegment(); + for (final Map.Entry<Bytes, byte[]> segmentEntry : bufferSegment.getAll().entrySet()) { + dbSegment.addToBatch( + new KeyValue<>(segmentEntry.getKey().get(), segmentEntry.getValue()), + segmentsBatch); + } + } + + // write to db. all the logical segments share the same physical store, + // so we can use any segment to perform the write + allSegments.get(0).dbSegment().write(segmentsBatch); + } + } catch (final RocksDBException e) { + log.error("Error restoring batch to RocksDBVersionedStore segments store."); + throw e; + } + segmentsWriteBuffer.clear(); + + // flush latest value store + try (final WriteBatch latestValueBatch = new WriteBatch()) { + // collect entries into write batch + for (final Map.Entry<Bytes, Optional<byte[]>> latestValueEntry : latestValueWriteBuffer.entrySet()) { + final byte[] value = latestValueEntry.getValue().orElse(null); + dbClient.addToLatestValueBatch( + new KeyValue<>(latestValueEntry.getKey().get(), value), + latestValueBatch); + } + + // write to db + dbClient.writeLatestValues(latestValueBatch); + } catch (final RocksDBException e) { + log.error("Error restoring batch to RocksDBVersionedStore latest value store."); + throw e; + } + latestValueWriteBuffer.clear(); + } + + /** + * The object representation of the write buffer corresponding to a single segment store. + * Contains the write buffer itself (a simple hash map) and also a reference to the underlying + * persistent segment store. + */ + private class WriteBufferSegmentWithDbFallback implements VersionedStoreSegment { + + private final long id; + private final Map<Bytes, byte[]> data; + private final LogicalKeyValueSegment dbSegment; + + WriteBufferSegmentWithDbFallback(final LogicalKeyValueSegment dbSegment) { + this.dbSegment = Objects.requireNonNull(dbSegment); + this.id = dbSegment.id(); + this.data = new HashMap<>(); + + // register segment with segments store + segmentsWriteBuffer.put(id, this); + } + + LogicalKeyValueSegment dbSegment() { + return dbSegment; + } + + @Override + public long id() { + return id; + } + + @Override + public void put(final Bytes key, final byte[] value) { + // all writes go to the write buffer + data.put(key, value); + } + + @Override + public byte[] get(final Bytes key) { + final byte[] bufferValue = data.get(key); + if (bufferValue != null) { + return bufferValue; + } + return dbSegment.get(key); + } + + Map<Bytes, byte[]> getAll() { + return Collections.unmodifiableMap(data); + } + } + + /** + * Client for writing to (and reading from) the write buffer as part of restore. + */ + private class RocksDBVersionedStoreRestoreClient implements VersionedStoreClient<WriteBufferSegmentWithDbFallback> { + + @Override + public byte[] getLatestValue(final Bytes key) { + final Optional<byte[]> bufferValue = latestValueWriteBuffer.get(key); + if (bufferValue != null) { + return bufferValue.orElse(null); + } + return dbClient.getLatestValue(key); + } + + @Override + public void putLatestValue(final Bytes key, final byte[] value) { + // all writes go to write buffer + latestValueWriteBuffer.put(key, Optional.ofNullable(value)); + } + + @Override + public void deleteLatestValue(final Bytes key) { + putLatestValue(key, null); + } + + @Override + public WriteBufferSegmentWithDbFallback getOrCreateSegmentIfLive(final long segmentId, final ProcessorContext context, final long streamTime) { + if (segmentsWriteBuffer.containsKey(segmentId)) { + return segmentsWriteBuffer.get(segmentId); + } + + final LogicalKeyValueSegment dbSegment = dbClient.getOrCreateSegmentIfLive(segmentId, context, streamTime); + if (dbSegment == null) { + // segment is not live + return null; + } + // creating a new segment automatically registers it with the segments store + return new WriteBufferSegmentWithDbFallback(dbSegment); + } + + @Override + public List<WriteBufferSegmentWithDbFallback> getReverseSegments(final long timestampFrom) { + // head and not tail because the map is sorted in reverse order + final long segmentFrom = segmentIdForTimestamp(timestampFrom); + final List<WriteBufferSegmentWithDbFallback> bufferSegments = + new ArrayList<>(segmentsWriteBuffer.headMap(segmentFrom, true).values()); + + final List<LogicalKeyValueSegment> dbSegments = dbClient.getReverseSegments(timestampFrom); + + // merge segments from db with segments from write buffer + final List<WriteBufferSegmentWithDbFallback> allSegments = new ArrayList<>(); + int dbIndex = 0; + int bufferIndex = 0; + while (dbIndex < dbSegments.size() && bufferIndex < bufferSegments.size()) { + final LogicalKeyValueSegment dbSegment = dbSegments.get(dbIndex); + final WriteBufferSegmentWithDbFallback bufferSegment = bufferSegments.get(bufferIndex); + final long dbSegmentId = dbSegment.id(); + final long bufferSegmentId = bufferSegment.id(); + if (dbSegmentId > bufferSegmentId) { + // creating a new segment automatically registers it with the segments store + allSegments.add(new WriteBufferSegmentWithDbFallback(dbSegment)); + dbIndex++; + } else if (dbSegmentId < bufferSegmentId) { + allSegments.add(bufferSegment); + bufferIndex++; + } else { + allSegments.add(bufferSegment); + dbIndex++; + bufferIndex++; + } + } + while (dbIndex < dbSegments.size()) { + // creating a new segment automatically registers it with the segments store + allSegments.add(new WriteBufferSegmentWithDbFallback(dbSegments.get(dbIndex))); + dbIndex++; + } + while (bufferIndex < bufferSegments.size()) { + allSegments.add(bufferSegments.get(bufferIndex)); + bufferIndex++; + } + return allSegments; + } + + @Override + public long segmentIdForTimestamp(final long timestamp) { + return dbClient.segmentIdForTimestamp(timestamp); + } + } +} \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java index 0e7983a009a..fb97ad3a6b7 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java @@ -20,6 +20,12 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.MatcherAssert.assertThat; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; @@ -409,6 +415,114 @@ public class RocksDBVersionedStoreTest { verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 8); } + @Test + public void shouldRestore() { + final List<DataRecord> records = new ArrayList<>(); + records.add(new DataRecord("k", "vp20", SEGMENT_INTERVAL + 20)); + records.add(new DataRecord("k", "vp10", SEGMENT_INTERVAL + 10)); + records.add(new DataRecord("k", "vn10", SEGMENT_INTERVAL - 10)); + records.add(new DataRecord("k", "vn2", SEGMENT_INTERVAL - 2)); + records.add(new DataRecord("k", "vn1", SEGMENT_INTERVAL - 1)); + records.add(new DataRecord("k", "vp1", SEGMENT_INTERVAL + 1)); + + store.restoreBatch(getChangelogRecords(records)); + + verifyGetValueFromStore("k", "vp20", SEGMENT_INTERVAL + 20); + verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL + 30, "vp20", SEGMENT_INTERVAL + 20); + verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL + 15, "vp10", SEGMENT_INTERVAL + 10); + verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL + 5, "vp1", SEGMENT_INTERVAL + 1); + verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL, "vn1", SEGMENT_INTERVAL - 1); + verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 1, "vn1", SEGMENT_INTERVAL - 1); + verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 2, "vn2", SEGMENT_INTERVAL - 2); + verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 5, "vn10", SEGMENT_INTERVAL - 10); + } + + @Test + public void shouldRestoreWithNulls() { + final List<DataRecord> records = new ArrayList<>(); + records.add(new DataRecord("k", null, SEGMENT_INTERVAL + 20)); + records.add(new DataRecord("k", null, SEGMENT_INTERVAL - 1)); + records.add(new DataRecord("k", null, SEGMENT_INTERVAL + 1)); + records.add(new DataRecord("k", null, SEGMENT_INTERVAL - 10)); + records.add(new DataRecord("k", null, SEGMENT_INTERVAL + 10)); + records.add(new DataRecord("k", "vp5", SEGMENT_INTERVAL + 5)); + records.add(new DataRecord("k", "vn5", SEGMENT_INTERVAL - 5)); + records.add(new DataRecord("k", "vn6", SEGMENT_INTERVAL - 6)); + + store.restoreBatch(getChangelogRecords(records)); + + verifyGetNullFromStore("k"); + verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL + 30); + verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL + 15); + verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL + 6, "vp5", SEGMENT_INTERVAL + 5); + verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL + 2); + verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL); + verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 1); + verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 5, "vn5", SEGMENT_INTERVAL - 5); + verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 6, "vn6", SEGMENT_INTERVAL - 6); + verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 8); + } + + @Test + public void shouldRestoreWithNullsAndRepeatTimestamps() { + final List<DataRecord> records = new ArrayList<>(); + records.add(new DataRecord("k", "to_be_replaced", SEGMENT_INTERVAL + 20)); + records.add(new DataRecord("k", null, SEGMENT_INTERVAL - 10)); + records.add(new DataRecord("k", "to_be_replaced", SEGMENT_INTERVAL - 10)); // replaces existing null with non-null, with timestamps spanning segments + records.add(new DataRecord("k", null, SEGMENT_INTERVAL - 10)); // replaces existing non-null with null + records.add(new DataRecord("k", "to_be_replaced", SEGMENT_INTERVAL - 1)); + records.add(new DataRecord("k", "to_be_replaced", SEGMENT_INTERVAL + 1)); + records.add(new DataRecord("k", null, SEGMENT_INTERVAL - 1)); // replaces existing non-null with null + records.add(new DataRecord("k", null, SEGMENT_INTERVAL + 1)); // replaces existing non-null with null, with timestamps spanning segments + records.add(new DataRecord("k", null, SEGMENT_INTERVAL + 10)); + records.add(new DataRecord("k", null, SEGMENT_INTERVAL + 5)); + records.add(new DataRecord("k", "vp5", SEGMENT_INTERVAL + 5)); // replaces existing null with non-null + records.add(new DataRecord("k", "to_be_replaced", SEGMENT_INTERVAL - 5)); + records.add(new DataRecord("k", "vn5", SEGMENT_INTERVAL - 5)); // replaces existing non-null with non-null + records.add(new DataRecord("k", null, SEGMENT_INTERVAL + 20)); // replaces existing non-null (latest value) with null + records.add(new DataRecord("k", null, SEGMENT_INTERVAL + 20)); // replaces existing null with null + records.add(new DataRecord("k", "vn6", SEGMENT_INTERVAL - 6)); + + store.restoreBatch(getChangelogRecords(records)); + + verifyGetNullFromStore("k"); + verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL + 30); + verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL + 15); + verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL + 6, "vp5", SEGMENT_INTERVAL + 5); + verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL + 2); + verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL); + verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 1); + verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 5, "vn5", SEGMENT_INTERVAL - 5); + verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 6, "vn6", SEGMENT_INTERVAL - 6); + verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 8); + } + + @Test + public void shouldRestoreMultipleBatches() { + final List<DataRecord> records = new ArrayList<>(); + records.add(new DataRecord("k", null, SEGMENT_INTERVAL - 20)); + records.add(new DataRecord("k", "vn10", SEGMENT_INTERVAL - 10)); + records.add(new DataRecord("k", null, SEGMENT_INTERVAL - 1)); + + final List<DataRecord> moreRecords = new ArrayList<>(); + moreRecords.add(new DataRecord("k", null, SEGMENT_INTERVAL + 1)); + moreRecords.add(new DataRecord("k", "vp10", SEGMENT_INTERVAL + 10)); + moreRecords.add(new DataRecord("k", null, SEGMENT_INTERVAL + 20)); + + store.restoreBatch(getChangelogRecords(records)); + store.restoreBatch(getChangelogRecords(moreRecords)); + + verifyGetNullFromStore("k"); + verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL + 30); + verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL + 15, "vp10", SEGMENT_INTERVAL + 10); + verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL + 5); + verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL + 2); + verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL); + verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 1); + verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 5, "vn10", SEGMENT_INTERVAL - 10); + verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 15); + } + private void putToStore(final String key, final String value, final long timestamp) { store.put( new Bytes(STRING_SERIALIZER.serialize(null, key)), @@ -464,4 +578,40 @@ public class RocksDBVersionedStoreTest { STRING_DESERIALIZER.deserialize(null, versionedRecord.value()), versionedRecord.timestamp()); } + + private static List<ConsumerRecord<byte[], byte[]>> getChangelogRecords(final List<DataRecord> data) { + final List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>(); + + for (final DataRecord d : data) { + final byte[] rawKey = STRING_SERIALIZER.serialize(null, d.key); + final byte[] rawValue = STRING_SERIALIZER.serialize(null, d.value); + records.add(new ConsumerRecord<>( + "", + 0, + 0L, + d.timestamp, + TimestampType.CREATE_TIME, + rawKey.length, + rawValue == null ? 0 : rawValue.length, + rawKey, + rawValue, + new RecordHeaders(), + Optional.empty() + )); + } + + return records; + } + + private static class DataRecord { + final String key; + final String value; + final long timestamp; + + DataRecord(final String key, final String value, final long timestamp) { + this.key = key; + this.value = value; + this.timestamp = timestamp; + } + } } \ No newline at end of file