This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 528a777df66 KAFKA-14491: [6/N] Support restoring RocksDB versioned
store from changelog (#13189)
528a777df66 is described below
commit 528a777df66ab1e66544ad2a8fb3d020ff229bff
Author: Victoria Xia <[email protected]>
AuthorDate: Mon Feb 13 17:06:44 2023 -0800
KAFKA-14491: [6/N] Support restoring RocksDB versioned store from changelog
(#13189)
This PR builds on the new RocksDB-based versioned store implementation (see
KIP-889) by adding code for restoring from changelog. The changelog topic
format is the same as for regular timestamped key-value stores: record keys,
values, and timestamps are stored in the Kafka message key, value, and
timestamp, respectively. The code for actually writing to this changelog will
come in a follow-up PR.
Reviewers: Matthias J. Sax <[email protected]>
---
.../state/internals/RocksDBVersionedStore.java | 90 ++++++-
.../RocksDBVersionedStoreRestoreWriteBuffer.java | 269 +++++++++++++++++++++
.../state/internals/RocksDBVersionedStoreTest.java | 150 ++++++++++++
3 files changed, 502 insertions(+), 7 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java
index e40fc36e5bb..f56ceccc0b7 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java
@@ -16,6 +16,8 @@
*/
package org.apache.kafka.streams.state.internals;
+import static
org.apache.kafka.streams.StreamsConfig.InternalConfig.IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED;
+
import java.io.File;
import java.nio.ByteBuffer;
import java.util.Collection;
@@ -24,9 +26,13 @@ import java.util.Optional;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
+import
org.apache.kafka.streams.processor.internals.ChangelogRecordDeserializationHelper;
import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
import
org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
import
org.apache.kafka.streams.processor.internals.StoreToProcessorContextAdapter;
@@ -38,6 +44,8 @@ import org.apache.kafka.streams.state.VersionedRecord;
import
org.apache.kafka.streams.state.internals.RocksDBVersionedStoreSegmentValueFormatter.SegmentValue;
import
org.apache.kafka.streams.state.internals.RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult;
import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -78,12 +86,14 @@ public class RocksDBVersionedStore implements
VersionedKeyValueStore<Bytes, byte
private final RocksDBStore latestValueStore;
private final LogicalKeyValueSegments segmentStores;
- private final VersionedStoreClient<LogicalKeyValueSegment>
versionedStoreClient;
+ private final RocksDBVersionedStoreClient versionedStoreClient;
+ private final RocksDBVersionedStoreRestoreWriteBuffer restoreWriteBuffer;
private ProcessorContext context;
private StateStoreContext stateStoreContext;
private Sensor expiredRecordSensor;
private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+ private boolean consistencyEnabled = false;
private Position position;
private OffsetCheckpoint positionCheckpoint;
private volatile boolean open;
@@ -95,6 +105,7 @@ public class RocksDBVersionedStore implements
VersionedKeyValueStore<Bytes, byte
this.latestValueStore = new RocksDBStore(latestValueStoreName(name),
name, metricsRecorder);
this.segmentStores = new
LogicalKeyValueSegments(segmentsStoreName(name), name, historyRetention,
segmentInterval, metricsRecorder);
this.versionedStoreClient = new RocksDBVersionedStoreClient();
+ this.restoreWriteBuffer = new
RocksDBVersionedStoreRestoreWriteBuffer(versionedStoreClient);
}
@Override
@@ -121,11 +132,11 @@ public class RocksDBVersionedStore implements
VersionedKeyValueStore<Bytes, byte
@Override
public VersionedRecord<byte[]> get(final Bytes key) {
// latest value (if present) is guaranteed to be in the latest value
store
- final byte[] latestValue = latestValueStore.get(key);
- if (latestValue != null) {
+ final byte[] rawLatestValueAndTimestamp = latestValueStore.get(key);
+ if (rawLatestValueAndTimestamp != null) {
return new VersionedRecord<>(
- LatestValueFormatter.getValue(latestValue),
- LatestValueFormatter.getTimestamp(latestValue)
+ LatestValueFormatter.getValue(rawLatestValueAndTimestamp),
+ LatestValueFormatter.getTimestamp(rawLatestValueAndTimestamp)
);
} else {
return null;
@@ -256,6 +267,12 @@ public class RocksDBVersionedStore implements
VersionedKeyValueStore<Bytes, byte
);
open = true;
+
+ consistencyEnabled = StreamsConfig.InternalConfig.getBoolean(
+ context.appConfigs(),
+ IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED,
+ false
+ );
}
@Override
@@ -266,7 +283,41 @@ public class RocksDBVersionedStore implements
VersionedKeyValueStore<Bytes, byte
// VisibleForTesting
void restoreBatch(final Collection<ConsumerRecord<byte[], byte[]>>
records) {
- throw new UnsupportedOperationException("not yet implemented");
+ // advance stream time to the max timestamp in the batch
+ for (final ConsumerRecord<byte[], byte[]> record : records) {
+ observedStreamTime = Math.max(observedStreamTime,
record.timestamp());
+ }
+
+ final VersionedStoreClient<?> restoreClient =
restoreWriteBuffer.getClient();
+
+ // note: there is increased risk for hitting an out-of-memory during
this restore loop,
+ // compared to for non-versioned key-value stores, because this
versioned store
+ // implementation stores multiple records (for the same key) together
in a single RocksDB
+ // "segment" entry -- restoring a single changelog entry could require
loading multiple
+ // records into memory. how high this memory amplification will be is
very much dependent
+ // on the specific workload and the value of the "segment interval"
parameter.
+ for (final ConsumerRecord<byte[], byte[]> record : records) {
+ ChangelogRecordDeserializationHelper.applyChecksAndUpdatePosition(
+ record,
+ consistencyEnabled,
+ position
+ );
+
+ // put records to write buffer
+ doPut(
+ restoreClient,
+ Optional.empty(),
+ new Bytes(record.key()),
+ record.value(),
+ record.timestamp()
+ );
+ }
+
+ try {
+ restoreWriteBuffer.flush();
+ } catch (final RocksDBException e) {
+ throw new ProcessorStateException("Error restoring batch to store
" + name, e);
+ }
}
/**
@@ -329,7 +380,7 @@ public class RocksDBVersionedStore implements
VersionedKeyValueStore<Bytes, byte
/**
* Client for writing into (and reading from) this persistent {@link
RocksDBVersionedStore}.
*/
- private class RocksDBVersionedStoreClient implements
VersionedStoreClient<LogicalKeyValueSegment> {
+ class RocksDBVersionedStoreClient implements
VersionedStoreClient<LogicalKeyValueSegment> {
@Override
public byte[] getLatestValue(final Bytes key) {
@@ -360,6 +411,31 @@ public class RocksDBVersionedStore implements
VersionedKeyValueStore<Bytes, byte
public long segmentIdForTimestamp(final long timestamp) {
return segmentStores.segmentId(timestamp);
}
+
+ /**
+ * Adds the provided record into the provided batch, in preparation
for writing into the
+ * latest value store.
+ * <p>
+ * Together with {@link #writeLatestValues(WriteBatch)}, this method
supports batch writes
+ * into the latest value store.
+ *
+ * @throws RocksDBException if a failure occurs adding the record to
the {@link WriteBatch}
+ */
+ public void addToLatestValueBatch(final KeyValue<byte[], byte[]>
record, final WriteBatch batch) throws RocksDBException {
+ latestValueStore.addToBatch(record, batch);
+ }
+
+ /**
+ * Writes the provided batch of records into the latest value store.
+ * <p>
+ * Together with {@link #addToLatestValueBatch(KeyValue, WriteBatch)},
this method supports
+ * batch writes into the latest value store.
+ *
+ * @throws RocksDBException if a failure occurs while writing the
{@link WriteBatch} to the store
+ */
+ public void writeLatestValues(final WriteBatch batch) throws
RocksDBException {
+ latestValueStore.write(batch);
+ }
}
private <T extends VersionedStoreSegment> void doPut(
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreRestoreWriteBuffer.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreRestoreWriteBuffer.java
new file mode 100644
index 00000000000..e50913d8ad1
--- /dev/null
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreRestoreWriteBuffer.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.TreeMap;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import
org.apache.kafka.streams.state.internals.RocksDBVersionedStore.RocksDBVersionedStoreClient;
+import
org.apache.kafka.streams.state.internals.RocksDBVersionedStore.VersionedStoreClient;
+import
org.apache.kafka.streams.state.internals.RocksDBVersionedStore.VersionedStoreSegment;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A write buffer for use in restoring a {@link RocksDBVersionedStore} from
its changelog. This
+ * class exposes a {@link VersionedStoreClient} to put records into the write
buffer, which may
+ * then be flushed to the store via {@link WriteBatch}es, for improved write
efficiency during
+ * restoration.
+ * <p>
+ * The structure of the internals of this write buffer mirrors the structure
of the
+ * {@code RocksDBVersionedStore} itself, i.e., data for the latest value store
and each of the
+ * segment stores is buffered in a separate object -- specifically, a map.
+ */
+public class RocksDBVersionedStoreRestoreWriteBuffer {
+
+ private static final Logger log =
LoggerFactory.getLogger(RocksDBVersionedStoreRestoreWriteBuffer.class);
+
+ // write buffer for latest value store. value type is Optional in order to
track tombstones
+ // which must be written to the underlying store.
+ private final Map<Bytes, Optional<byte[]>> latestValueWriteBuffer;
+ // map from segment id to write buffer. segments are stored in
reverse-sorted order,
+ // so getReverseSegments() is more efficient
+ private final TreeMap<Long, WriteBufferSegmentWithDbFallback>
segmentsWriteBuffer;
+ private final RocksDBVersionedStoreClient dbClient;
+ private final RocksDBVersionedStoreRestoreClient restoreClient;
+
+ /**
+ * Creates a new write buffer.
+ * @param dbClient client for reading from and writing to the underlying
persistent store
+ */
+ RocksDBVersionedStoreRestoreWriteBuffer(final RocksDBVersionedStoreClient
dbClient) {
+ this.dbClient = Objects.requireNonNull(dbClient);
+
+ this.latestValueWriteBuffer = new HashMap<>();
+ // store in reverse-sorted order, to make getReverseSegments() more
efficient
+ this.segmentsWriteBuffer = new TreeMap<>((x, y) -> Long.compare(y, x));
+ this.restoreClient = new RocksDBVersionedStoreRestoreClient();
+ }
+
+ /**
+ * @return client for writing to (and reading from) the write buffer
+ */
+ VersionedStoreClient<?> getClient() {
+ return restoreClient;
+ }
+
+ /**
+ * Flushes the contents of the write buffer into the persistent store, and
clears the write
+ * buffer in the process.
+ * @throws RocksDBException if a failure occurs adding to or writing a
{@link WriteBatch}
+ */
+ void flush() throws RocksDBException {
+
+ // flush segments first, as this is consistent with the store always
writing to
+ // older segments/stores before later ones
+ try (final WriteBatch segmentsBatch = new WriteBatch()) {
+ final List<WriteBufferSegmentWithDbFallback> allSegments =
restoreClient.getReverseSegments(Long.MIN_VALUE);
+ if (allSegments.size() > 0) {
+ // collect entries into write batch
+ for (final WriteBufferSegmentWithDbFallback bufferSegment :
allSegments) {
+ final LogicalKeyValueSegment dbSegment =
bufferSegment.dbSegment();
+ for (final Map.Entry<Bytes, byte[]> segmentEntry :
bufferSegment.getAll().entrySet()) {
+ dbSegment.addToBatch(
+ new KeyValue<>(segmentEntry.getKey().get(),
segmentEntry.getValue()),
+ segmentsBatch);
+ }
+ }
+
+ // write to db. all the logical segments share the same
physical store,
+ // so we can use any segment to perform the write
+ allSegments.get(0).dbSegment().write(segmentsBatch);
+ }
+ } catch (final RocksDBException e) {
+ log.error("Error restoring batch to RocksDBVersionedStore segments
store.");
+ throw e;
+ }
+ segmentsWriteBuffer.clear();
+
+ // flush latest value store
+ try (final WriteBatch latestValueBatch = new WriteBatch()) {
+ // collect entries into write batch
+ for (final Map.Entry<Bytes, Optional<byte[]>> latestValueEntry :
latestValueWriteBuffer.entrySet()) {
+ final byte[] value = latestValueEntry.getValue().orElse(null);
+ dbClient.addToLatestValueBatch(
+ new KeyValue<>(latestValueEntry.getKey().get(), value),
+ latestValueBatch);
+ }
+
+ // write to db
+ dbClient.writeLatestValues(latestValueBatch);
+ } catch (final RocksDBException e) {
+ log.error("Error restoring batch to RocksDBVersionedStore latest
value store.");
+ throw e;
+ }
+ latestValueWriteBuffer.clear();
+ }
+
+ /**
+ * The object representation of the write buffer corresponding to a single
segment store.
+ * Contains the write buffer itself (a simple hash map) and also a
reference to the underlying
+ * persistent segment store.
+ */
+ private class WriteBufferSegmentWithDbFallback implements
VersionedStoreSegment {
+
+ private final long id;
+ private final Map<Bytes, byte[]> data;
+ private final LogicalKeyValueSegment dbSegment;
+
+ WriteBufferSegmentWithDbFallback(final LogicalKeyValueSegment
dbSegment) {
+ this.dbSegment = Objects.requireNonNull(dbSegment);
+ this.id = dbSegment.id();
+ this.data = new HashMap<>();
+
+ // register segment with segments store
+ segmentsWriteBuffer.put(id, this);
+ }
+
+ LogicalKeyValueSegment dbSegment() {
+ return dbSegment;
+ }
+
+ @Override
+ public long id() {
+ return id;
+ }
+
+ @Override
+ public void put(final Bytes key, final byte[] value) {
+ // all writes go to the write buffer
+ data.put(key, value);
+ }
+
+ @Override
+ public byte[] get(final Bytes key) {
+ final byte[] bufferValue = data.get(key);
+ if (bufferValue != null) {
+ return bufferValue;
+ }
+ return dbSegment.get(key);
+ }
+
+ Map<Bytes, byte[]> getAll() {
+ return Collections.unmodifiableMap(data);
+ }
+ }
+
+ /**
+ * Client for writing to (and reading from) the write buffer as part of
restore.
+ */
+ private class RocksDBVersionedStoreRestoreClient implements
VersionedStoreClient<WriteBufferSegmentWithDbFallback> {
+
+ @Override
+ public byte[] getLatestValue(final Bytes key) {
+ final Optional<byte[]> bufferValue =
latestValueWriteBuffer.get(key);
+ if (bufferValue != null) {
+ return bufferValue.orElse(null);
+ }
+ return dbClient.getLatestValue(key);
+ }
+
+ @Override
+ public void putLatestValue(final Bytes key, final byte[] value) {
+ // all writes go to write buffer
+ latestValueWriteBuffer.put(key, Optional.ofNullable(value));
+ }
+
+ @Override
+ public void deleteLatestValue(final Bytes key) {
+ putLatestValue(key, null);
+ }
+
+ @Override
+ public WriteBufferSegmentWithDbFallback getOrCreateSegmentIfLive(final
long segmentId, final ProcessorContext context, final long streamTime) {
+ if (segmentsWriteBuffer.containsKey(segmentId)) {
+ return segmentsWriteBuffer.get(segmentId);
+ }
+
+ final LogicalKeyValueSegment dbSegment =
dbClient.getOrCreateSegmentIfLive(segmentId, context, streamTime);
+ if (dbSegment == null) {
+ // segment is not live
+ return null;
+ }
+ // creating a new segment automatically registers it with the
segments store
+ return new WriteBufferSegmentWithDbFallback(dbSegment);
+ }
+
+ @Override
+ public List<WriteBufferSegmentWithDbFallback> getReverseSegments(final
long timestampFrom) {
+ // head and not tail because the map is sorted in reverse order
+ final long segmentFrom = segmentIdForTimestamp(timestampFrom);
+ final List<WriteBufferSegmentWithDbFallback> bufferSegments =
+ new ArrayList<>(segmentsWriteBuffer.headMap(segmentFrom,
true).values());
+
+ final List<LogicalKeyValueSegment> dbSegments =
dbClient.getReverseSegments(timestampFrom);
+
+ // merge segments from db with segments from write buffer
+ final List<WriteBufferSegmentWithDbFallback> allSegments = new
ArrayList<>();
+ int dbIndex = 0;
+ int bufferIndex = 0;
+ while (dbIndex < dbSegments.size() && bufferIndex <
bufferSegments.size()) {
+ final LogicalKeyValueSegment dbSegment =
dbSegments.get(dbIndex);
+ final WriteBufferSegmentWithDbFallback bufferSegment =
bufferSegments.get(bufferIndex);
+ final long dbSegmentId = dbSegment.id();
+ final long bufferSegmentId = bufferSegment.id();
+ if (dbSegmentId > bufferSegmentId) {
+ // creating a new segment automatically registers it with
the segments store
+ allSegments.add(new
WriteBufferSegmentWithDbFallback(dbSegment));
+ dbIndex++;
+ } else if (dbSegmentId < bufferSegmentId) {
+ allSegments.add(bufferSegment);
+ bufferIndex++;
+ } else {
+ allSegments.add(bufferSegment);
+ dbIndex++;
+ bufferIndex++;
+ }
+ }
+ while (dbIndex < dbSegments.size()) {
+ // creating a new segment automatically registers it with the
segments store
+ allSegments.add(new
WriteBufferSegmentWithDbFallback(dbSegments.get(dbIndex)));
+ dbIndex++;
+ }
+ while (bufferIndex < bufferSegments.size()) {
+ allSegments.add(bufferSegments.get(bufferIndex));
+ bufferIndex++;
+ }
+ return allSegments;
+ }
+
+ @Override
+ public long segmentIdForTimestamp(final long timestamp) {
+ return dbClient.segmentIdForTimestamp(timestamp);
+ }
+ }
+}
\ No newline at end of file
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java
index 0e7983a009a..fb97ad3a6b7 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java
@@ -20,6 +20,12 @@ import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
@@ -409,6 +415,114 @@ public class RocksDBVersionedStoreTest {
verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 8);
}
+ @Test
+ public void shouldRestore() {
+ final List<DataRecord> records = new ArrayList<>();
+ records.add(new DataRecord("k", "vp20", SEGMENT_INTERVAL + 20));
+ records.add(new DataRecord("k", "vp10", SEGMENT_INTERVAL + 10));
+ records.add(new DataRecord("k", "vn10", SEGMENT_INTERVAL - 10));
+ records.add(new DataRecord("k", "vn2", SEGMENT_INTERVAL - 2));
+ records.add(new DataRecord("k", "vn1", SEGMENT_INTERVAL - 1));
+ records.add(new DataRecord("k", "vp1", SEGMENT_INTERVAL + 1));
+
+ store.restoreBatch(getChangelogRecords(records));
+
+ verifyGetValueFromStore("k", "vp20", SEGMENT_INTERVAL + 20);
+ verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL + 30, "vp20",
SEGMENT_INTERVAL + 20);
+ verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL + 15, "vp10",
SEGMENT_INTERVAL + 10);
+ verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL + 5, "vp1",
SEGMENT_INTERVAL + 1);
+ verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL, "vn1",
SEGMENT_INTERVAL - 1);
+ verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 1, "vn1",
SEGMENT_INTERVAL - 1);
+ verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 2, "vn2",
SEGMENT_INTERVAL - 2);
+ verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 5, "vn10",
SEGMENT_INTERVAL - 10);
+ }
+
+ @Test
+ public void shouldRestoreWithNulls() {
+ final List<DataRecord> records = new ArrayList<>();
+ records.add(new DataRecord("k", null, SEGMENT_INTERVAL + 20));
+ records.add(new DataRecord("k", null, SEGMENT_INTERVAL - 1));
+ records.add(new DataRecord("k", null, SEGMENT_INTERVAL + 1));
+ records.add(new DataRecord("k", null, SEGMENT_INTERVAL - 10));
+ records.add(new DataRecord("k", null, SEGMENT_INTERVAL + 10));
+ records.add(new DataRecord("k", "vp5", SEGMENT_INTERVAL + 5));
+ records.add(new DataRecord("k", "vn5", SEGMENT_INTERVAL - 5));
+ records.add(new DataRecord("k", "vn6", SEGMENT_INTERVAL - 6));
+
+ store.restoreBatch(getChangelogRecords(records));
+
+ verifyGetNullFromStore("k");
+ verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL + 30);
+ verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL + 15);
+ verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL + 6, "vp5",
SEGMENT_INTERVAL + 5);
+ verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL + 2);
+ verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL);
+ verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 1);
+ verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 5, "vn5",
SEGMENT_INTERVAL - 5);
+ verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 6, "vn6",
SEGMENT_INTERVAL - 6);
+ verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 8);
+ }
+
+ @Test
+ public void shouldRestoreWithNullsAndRepeatTimestamps() {
+ final List<DataRecord> records = new ArrayList<>();
+ records.add(new DataRecord("k", "to_be_replaced", SEGMENT_INTERVAL +
20));
+ records.add(new DataRecord("k", null, SEGMENT_INTERVAL - 10));
+ records.add(new DataRecord("k", "to_be_replaced", SEGMENT_INTERVAL -
10)); // replaces existing null with non-null, with timestamps spanning segments
+ records.add(new DataRecord("k", null, SEGMENT_INTERVAL - 10)); //
replaces existing non-null with null
+ records.add(new DataRecord("k", "to_be_replaced", SEGMENT_INTERVAL -
1));
+ records.add(new DataRecord("k", "to_be_replaced", SEGMENT_INTERVAL +
1));
+ records.add(new DataRecord("k", null, SEGMENT_INTERVAL - 1)); //
replaces existing non-null with null
+ records.add(new DataRecord("k", null, SEGMENT_INTERVAL + 1)); //
replaces existing non-null with null, with timestamps spanning segments
+ records.add(new DataRecord("k", null, SEGMENT_INTERVAL + 10));
+ records.add(new DataRecord("k", null, SEGMENT_INTERVAL + 5));
+ records.add(new DataRecord("k", "vp5", SEGMENT_INTERVAL + 5)); //
replaces existing null with non-null
+ records.add(new DataRecord("k", "to_be_replaced", SEGMENT_INTERVAL -
5));
+ records.add(new DataRecord("k", "vn5", SEGMENT_INTERVAL - 5)); //
replaces existing non-null with non-null
+ records.add(new DataRecord("k", null, SEGMENT_INTERVAL + 20)); //
replaces existing non-null (latest value) with null
+ records.add(new DataRecord("k", null, SEGMENT_INTERVAL + 20)); //
replaces existing null with null
+ records.add(new DataRecord("k", "vn6", SEGMENT_INTERVAL - 6));
+
+ store.restoreBatch(getChangelogRecords(records));
+
+ verifyGetNullFromStore("k");
+ verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL + 30);
+ verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL + 15);
+ verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL + 6, "vp5",
SEGMENT_INTERVAL + 5);
+ verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL + 2);
+ verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL);
+ verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 1);
+ verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 5, "vn5",
SEGMENT_INTERVAL - 5);
+ verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 6, "vn6",
SEGMENT_INTERVAL - 6);
+ verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 8);
+ }
+
+ @Test
+ public void shouldRestoreMultipleBatches() {
+ final List<DataRecord> records = new ArrayList<>();
+ records.add(new DataRecord("k", null, SEGMENT_INTERVAL - 20));
+ records.add(new DataRecord("k", "vn10", SEGMENT_INTERVAL - 10));
+ records.add(new DataRecord("k", null, SEGMENT_INTERVAL - 1));
+
+ final List<DataRecord> moreRecords = new ArrayList<>();
+ moreRecords.add(new DataRecord("k", null, SEGMENT_INTERVAL + 1));
+ moreRecords.add(new DataRecord("k", "vp10", SEGMENT_INTERVAL + 10));
+ moreRecords.add(new DataRecord("k", null, SEGMENT_INTERVAL + 20));
+
+ store.restoreBatch(getChangelogRecords(records));
+ store.restoreBatch(getChangelogRecords(moreRecords));
+
+ verifyGetNullFromStore("k");
+ verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL + 30);
+ verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL + 15, "vp10",
SEGMENT_INTERVAL + 10);
+ verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL + 5);
+ verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL + 2);
+ verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL);
+ verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 1);
+ verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 5, "vn10",
SEGMENT_INTERVAL - 10);
+ verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 15);
+ }
+
private void putToStore(final String key, final String value, final long
timestamp) {
store.put(
new Bytes(STRING_SERIALIZER.serialize(null, key)),
@@ -464,4 +578,40 @@ public class RocksDBVersionedStoreTest {
STRING_DESERIALIZER.deserialize(null, versionedRecord.value()),
versionedRecord.timestamp());
}
+
+ private static List<ConsumerRecord<byte[], byte[]>>
getChangelogRecords(final List<DataRecord> data) {
+ final List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>();
+
+ for (final DataRecord d : data) {
+ final byte[] rawKey = STRING_SERIALIZER.serialize(null, d.key);
+ final byte[] rawValue = STRING_SERIALIZER.serialize(null, d.value);
+ records.add(new ConsumerRecord<>(
+ "",
+ 0,
+ 0L,
+ d.timestamp,
+ TimestampType.CREATE_TIME,
+ rawKey.length,
+ rawValue == null ? 0 : rawValue.length,
+ rawKey,
+ rawValue,
+ new RecordHeaders(),
+ Optional.empty()
+ ));
+ }
+
+ return records;
+ }
+
+ private static class DataRecord {
+ final String key;
+ final String value;
+ final long timestamp;
+
+ DataRecord(final String key, final String value, final long timestamp)
{
+ this.key = key;
+ this.value = value;
+ this.timestamp = timestamp;
+ }
+ }
}
\ No newline at end of file