This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 528a777df66 KAFKA-14491: [6/N] Support restoring RocksDB versioned 
store from changelog (#13189)
528a777df66 is described below

commit 528a777df66ab1e66544ad2a8fb3d020ff229bff
Author: Victoria Xia <victoria....@confluent.io>
AuthorDate: Mon Feb 13 17:06:44 2023 -0800

    KAFKA-14491: [6/N] Support restoring RocksDB versioned store from changelog 
(#13189)
    
    This PR builds on the new RocksDB-based versioned store implementation (see 
KIP-889) by adding code for restoring from changelog. The changelog topic 
format is the same as for regular timestamped key-value stores: record keys, 
values, and timestamps are stored in the Kafka message key, value, and 
timestamp, respectively. The code for actually writing to this changelog will 
come in a follow-up PR.
    
    Reviewers: Matthias J. Sax <matth...@confluent.io>
---
 .../state/internals/RocksDBVersionedStore.java     |  90 ++++++-
 .../RocksDBVersionedStoreRestoreWriteBuffer.java   | 269 +++++++++++++++++++++
 .../state/internals/RocksDBVersionedStoreTest.java | 150 ++++++++++++
 3 files changed, 502 insertions(+), 7 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java
index e40fc36e5bb..f56ceccc0b7 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import static 
org.apache.kafka.streams.StreamsConfig.InternalConfig.IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED;
+
 import java.io.File;
 import java.nio.ByteBuffer;
 import java.util.Collection;
@@ -24,9 +26,13 @@ import java.util.Optional;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
+import 
org.apache.kafka.streams.processor.internals.ChangelogRecordDeserializationHelper;
 import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
 import 
org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
 import 
org.apache.kafka.streams.processor.internals.StoreToProcessorContextAdapter;
@@ -38,6 +44,8 @@ import org.apache.kafka.streams.state.VersionedRecord;
 import 
org.apache.kafka.streams.state.internals.RocksDBVersionedStoreSegmentValueFormatter.SegmentValue;
 import 
org.apache.kafka.streams.state.internals.RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult;
 import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -78,12 +86,14 @@ public class RocksDBVersionedStore implements 
VersionedKeyValueStore<Bytes, byte
 
     private final RocksDBStore latestValueStore;
     private final LogicalKeyValueSegments segmentStores;
-    private final VersionedStoreClient<LogicalKeyValueSegment> 
versionedStoreClient;
+    private final RocksDBVersionedStoreClient versionedStoreClient;
+    private final RocksDBVersionedStoreRestoreWriteBuffer restoreWriteBuffer;
 
     private ProcessorContext context;
     private StateStoreContext stateStoreContext;
     private Sensor expiredRecordSensor;
     private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+    private boolean consistencyEnabled = false;
     private Position position;
     private OffsetCheckpoint positionCheckpoint;
     private volatile boolean open;
@@ -95,6 +105,7 @@ public class RocksDBVersionedStore implements 
VersionedKeyValueStore<Bytes, byte
         this.latestValueStore = new RocksDBStore(latestValueStoreName(name), 
name, metricsRecorder);
         this.segmentStores = new 
LogicalKeyValueSegments(segmentsStoreName(name), name, historyRetention, 
segmentInterval, metricsRecorder);
         this.versionedStoreClient = new RocksDBVersionedStoreClient();
+        this.restoreWriteBuffer = new 
RocksDBVersionedStoreRestoreWriteBuffer(versionedStoreClient);
     }
 
     @Override
@@ -121,11 +132,11 @@ public class RocksDBVersionedStore implements 
VersionedKeyValueStore<Bytes, byte
     @Override
     public VersionedRecord<byte[]> get(final Bytes key) {
         // latest value (if present) is guaranteed to be in the latest value 
store
-        final byte[] latestValue = latestValueStore.get(key);
-        if (latestValue != null) {
+        final byte[] rawLatestValueAndTimestamp = latestValueStore.get(key);
+        if (rawLatestValueAndTimestamp != null) {
             return new VersionedRecord<>(
-                LatestValueFormatter.getValue(latestValue),
-                LatestValueFormatter.getTimestamp(latestValue)
+                LatestValueFormatter.getValue(rawLatestValueAndTimestamp),
+                LatestValueFormatter.getTimestamp(rawLatestValueAndTimestamp)
             );
         } else {
             return null;
@@ -256,6 +267,12 @@ public class RocksDBVersionedStore implements 
VersionedKeyValueStore<Bytes, byte
         );
 
         open = true;
+
+        consistencyEnabled = StreamsConfig.InternalConfig.getBoolean(
+            context.appConfigs(),
+            IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED,
+            false
+        );
     }
 
     @Override
@@ -266,7 +283,41 @@ public class RocksDBVersionedStore implements 
VersionedKeyValueStore<Bytes, byte
 
     // VisibleForTesting
     void restoreBatch(final Collection<ConsumerRecord<byte[], byte[]>> 
records) {
-        throw new UnsupportedOperationException("not yet implemented");
+        // advance stream time to the max timestamp in the batch
+        for (final ConsumerRecord<byte[], byte[]> record : records) {
+            observedStreamTime = Math.max(observedStreamTime, 
record.timestamp());
+        }
+
+        final VersionedStoreClient<?> restoreClient = 
restoreWriteBuffer.getClient();
+
+        // note: there is increased risk for hitting an out-of-memory during 
this restore loop,
+        // compared to for non-versioned key-value stores, because this 
versioned store
+        // implementation stores multiple records (for the same key) together 
in a single RocksDB
+        // "segment" entry -- restoring a single changelog entry could require 
loading multiple
+        // records into memory. how high this memory amplification will be is 
very much dependent
+        // on the specific workload and the value of the "segment interval" 
parameter.
+        for (final ConsumerRecord<byte[], byte[]> record : records) {
+            ChangelogRecordDeserializationHelper.applyChecksAndUpdatePosition(
+                record,
+                consistencyEnabled,
+                position
+            );
+
+            // put records to write buffer
+            doPut(
+                restoreClient,
+                Optional.empty(),
+                new Bytes(record.key()),
+                record.value(),
+                record.timestamp()
+            );
+        }
+
+        try {
+            restoreWriteBuffer.flush();
+        } catch (final RocksDBException e) {
+            throw new ProcessorStateException("Error restoring batch to store 
" + name, e);
+        }
     }
 
     /**
@@ -329,7 +380,7 @@ public class RocksDBVersionedStore implements 
VersionedKeyValueStore<Bytes, byte
     /**
      * Client for writing into (and reading from) this persistent {@link 
RocksDBVersionedStore}.
      */
-    private class RocksDBVersionedStoreClient implements 
VersionedStoreClient<LogicalKeyValueSegment> {
+    class RocksDBVersionedStoreClient implements 
VersionedStoreClient<LogicalKeyValueSegment> {
 
         @Override
         public byte[] getLatestValue(final Bytes key) {
@@ -360,6 +411,31 @@ public class RocksDBVersionedStore implements 
VersionedKeyValueStore<Bytes, byte
         public long segmentIdForTimestamp(final long timestamp) {
             return segmentStores.segmentId(timestamp);
         }
+
+        /**
+         * Adds the provided record into the provided batch, in preparation 
for writing into the
+         * latest value store.
+         * <p>
+         * Together with {@link #writeLatestValues(WriteBatch)}, this method 
supports batch writes
+         * into the latest value store.
+         *
+         * @throws RocksDBException if a failure occurs adding the record to 
the {@link WriteBatch}
+         */
+        public void addToLatestValueBatch(final KeyValue<byte[], byte[]> 
record, final WriteBatch batch) throws RocksDBException {
+            latestValueStore.addToBatch(record, batch);
+        }
+
+        /**
+         * Writes the provided batch of records into the latest value store.
+         * <p>
+         * Together with {@link #addToLatestValueBatch(KeyValue, WriteBatch)}, 
this method supports
+         * batch writes into the latest value store.
+         *
+         * @throws RocksDBException if a failure occurs while writing the 
{@link WriteBatch} to the store
+         */
+        public void writeLatestValues(final WriteBatch batch) throws 
RocksDBException {
+            latestValueStore.write(batch);
+        }
     }
 
     private <T extends VersionedStoreSegment> void doPut(
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreRestoreWriteBuffer.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreRestoreWriteBuffer.java
new file mode 100644
index 00000000000..e50913d8ad1
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreRestoreWriteBuffer.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.TreeMap;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import 
org.apache.kafka.streams.state.internals.RocksDBVersionedStore.RocksDBVersionedStoreClient;
+import 
org.apache.kafka.streams.state.internals.RocksDBVersionedStore.VersionedStoreClient;
+import 
org.apache.kafka.streams.state.internals.RocksDBVersionedStore.VersionedStoreSegment;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A write buffer for use in restoring a {@link RocksDBVersionedStore} from 
its changelog. This
+ * class exposes a {@link VersionedStoreClient} to put records into the write 
buffer, which may
+ * then be flushed to the store via {@link WriteBatch}es, for improved write 
efficiency during
+ * restoration.
+ * <p>
+ * The structure of the internals of this write buffer mirrors the structure 
of the
+ * {@code RocksDBVersionedStore} itself, i.e., data for the latest value store 
and each of the
+ * segment stores is buffered in a separate object -- specifically, a map.
+ */
+public class RocksDBVersionedStoreRestoreWriteBuffer {
+
+    private static final Logger log = 
LoggerFactory.getLogger(RocksDBVersionedStoreRestoreWriteBuffer.class);
+
+    // write buffer for latest value store. value type is Optional in order to 
track tombstones
+    // which must be written to the underlying store.
+    private final Map<Bytes, Optional<byte[]>> latestValueWriteBuffer;
+    // map from segment id to write buffer. segments are stored in 
reverse-sorted order,
+    // so getReverseSegments() is more efficient
+    private final TreeMap<Long, WriteBufferSegmentWithDbFallback> 
segmentsWriteBuffer;
+    private final RocksDBVersionedStoreClient dbClient;
+    private final RocksDBVersionedStoreRestoreClient restoreClient;
+
+    /**
+     * Creates a new write buffer.
+     * @param dbClient client for reading from and writing to the underlying 
persistent store
+     */
+    RocksDBVersionedStoreRestoreWriteBuffer(final RocksDBVersionedStoreClient 
dbClient) {
+        this.dbClient = Objects.requireNonNull(dbClient);
+
+        this.latestValueWriteBuffer = new HashMap<>();
+        // store in reverse-sorted order, to make getReverseSegments() more 
efficient
+        this.segmentsWriteBuffer = new TreeMap<>((x, y) -> Long.compare(y, x));
+        this.restoreClient = new RocksDBVersionedStoreRestoreClient();
+    }
+
+    /**
+     * @return client for writing to (and reading from) the write buffer
+     */
+    VersionedStoreClient<?> getClient() {
+        return restoreClient;
+    }
+
+    /**
+     * Flushes the contents of the write buffer into the persistent store, and 
clears the write
+     * buffer in the process.
+     * @throws RocksDBException if a failure occurs adding to or writing a 
{@link WriteBatch}
+     */
+    void flush() throws RocksDBException {
+
+        // flush segments first, as this is consistent with the store always 
writing to
+        // older segments/stores before later ones
+        try (final WriteBatch segmentsBatch = new WriteBatch()) {
+            final List<WriteBufferSegmentWithDbFallback> allSegments = 
restoreClient.getReverseSegments(Long.MIN_VALUE);
+            if (allSegments.size() > 0) {
+                // collect entries into write batch
+                for (final WriteBufferSegmentWithDbFallback bufferSegment : 
allSegments) {
+                    final LogicalKeyValueSegment dbSegment = 
bufferSegment.dbSegment();
+                    for (final Map.Entry<Bytes, byte[]> segmentEntry : 
bufferSegment.getAll().entrySet()) {
+                        dbSegment.addToBatch(
+                            new KeyValue<>(segmentEntry.getKey().get(), 
segmentEntry.getValue()),
+                            segmentsBatch);
+                    }
+                }
+
+                // write to db. all the logical segments share the same 
physical store,
+                // so we can use any segment to perform the write
+                allSegments.get(0).dbSegment().write(segmentsBatch);
+            }
+        } catch (final RocksDBException e) {
+            log.error("Error restoring batch to RocksDBVersionedStore segments 
store.");
+            throw e;
+        }
+        segmentsWriteBuffer.clear();
+
+        // flush latest value store
+        try (final WriteBatch latestValueBatch = new WriteBatch()) {
+            // collect entries into write batch
+            for (final Map.Entry<Bytes, Optional<byte[]>> latestValueEntry : 
latestValueWriteBuffer.entrySet()) {
+                final byte[] value = latestValueEntry.getValue().orElse(null);
+                dbClient.addToLatestValueBatch(
+                    new KeyValue<>(latestValueEntry.getKey().get(), value),
+                    latestValueBatch);
+            }
+
+            // write to db
+            dbClient.writeLatestValues(latestValueBatch);
+        } catch (final RocksDBException e) {
+            log.error("Error restoring batch to RocksDBVersionedStore latest 
value store.");
+            throw e;
+        }
+        latestValueWriteBuffer.clear();
+    }
+
+    /**
+     * The object representation of the write buffer corresponding to a single 
segment store.
+     * Contains the write buffer itself (a simple hash map) and also a 
reference to the underlying
+     * persistent segment store.
+     */
+    private class WriteBufferSegmentWithDbFallback implements 
VersionedStoreSegment {
+
+        private final long id;
+        private final Map<Bytes, byte[]> data;
+        private final LogicalKeyValueSegment dbSegment;
+
+        WriteBufferSegmentWithDbFallback(final LogicalKeyValueSegment 
dbSegment) {
+            this.dbSegment = Objects.requireNonNull(dbSegment);
+            this.id = dbSegment.id();
+            this.data = new HashMap<>();
+
+            // register segment with segments store
+            segmentsWriteBuffer.put(id, this);
+        }
+
+        LogicalKeyValueSegment dbSegment() {
+            return dbSegment;
+        }
+
+        @Override
+        public long id() {
+            return id;
+        }
+
+        @Override
+        public void put(final Bytes key, final byte[] value) {
+            // all writes go to the write buffer
+            data.put(key, value);
+        }
+
+        @Override
+        public byte[] get(final Bytes key) {
+            final byte[] bufferValue = data.get(key);
+            if (bufferValue != null) {
+                return bufferValue;
+            }
+            return dbSegment.get(key);
+        }
+
+        Map<Bytes, byte[]> getAll() {
+            return Collections.unmodifiableMap(data);
+        }
+    }
+
+    /**
+     * Client for writing to (and reading from) the write buffer as part of 
restore.
+     */
+    private class RocksDBVersionedStoreRestoreClient implements 
VersionedStoreClient<WriteBufferSegmentWithDbFallback> {
+
+        @Override
+        public byte[] getLatestValue(final Bytes key) {
+            final Optional<byte[]> bufferValue = 
latestValueWriteBuffer.get(key);
+            if (bufferValue != null) {
+                return bufferValue.orElse(null);
+            }
+            return dbClient.getLatestValue(key);
+        }
+
+        @Override
+        public void putLatestValue(final Bytes key, final byte[] value) {
+            // all writes go to write buffer
+            latestValueWriteBuffer.put(key, Optional.ofNullable(value));
+        }
+
+        @Override
+        public void deleteLatestValue(final Bytes key) {
+            putLatestValue(key, null);
+        }
+
+        @Override
+        public WriteBufferSegmentWithDbFallback getOrCreateSegmentIfLive(final 
long segmentId, final ProcessorContext context, final long streamTime) {
+            if (segmentsWriteBuffer.containsKey(segmentId)) {
+                return segmentsWriteBuffer.get(segmentId);
+            }
+
+            final LogicalKeyValueSegment dbSegment = 
dbClient.getOrCreateSegmentIfLive(segmentId, context, streamTime);
+            if (dbSegment == null) {
+                // segment is not live
+                return null;
+            }
+            // creating a new segment automatically registers it with the 
segments store
+            return new WriteBufferSegmentWithDbFallback(dbSegment);
+        }
+
+        @Override
+        public List<WriteBufferSegmentWithDbFallback> getReverseSegments(final 
long timestampFrom) {
+            // head and not tail because the map is sorted in reverse order
+            final long segmentFrom = segmentIdForTimestamp(timestampFrom);
+            final List<WriteBufferSegmentWithDbFallback> bufferSegments =
+                new ArrayList<>(segmentsWriteBuffer.headMap(segmentFrom, 
true).values());
+
+            final List<LogicalKeyValueSegment> dbSegments = 
dbClient.getReverseSegments(timestampFrom);
+
+            // merge segments from db with segments from write buffer
+            final List<WriteBufferSegmentWithDbFallback> allSegments = new 
ArrayList<>();
+            int dbIndex = 0;
+            int bufferIndex = 0;
+            while (dbIndex < dbSegments.size() && bufferIndex < 
bufferSegments.size()) {
+                final LogicalKeyValueSegment dbSegment = 
dbSegments.get(dbIndex);
+                final WriteBufferSegmentWithDbFallback bufferSegment = 
bufferSegments.get(bufferIndex);
+                final long dbSegmentId = dbSegment.id();
+                final long bufferSegmentId = bufferSegment.id();
+                if (dbSegmentId > bufferSegmentId) {
+                    // creating a new segment automatically registers it with 
the segments store
+                    allSegments.add(new 
WriteBufferSegmentWithDbFallback(dbSegment));
+                    dbIndex++;
+                } else if (dbSegmentId < bufferSegmentId) {
+                    allSegments.add(bufferSegment);
+                    bufferIndex++;
+                } else {
+                    allSegments.add(bufferSegment);
+                    dbIndex++;
+                    bufferIndex++;
+                }
+            }
+            while (dbIndex < dbSegments.size()) {
+                // creating a new segment automatically registers it with the 
segments store
+                allSegments.add(new 
WriteBufferSegmentWithDbFallback(dbSegments.get(dbIndex)));
+                dbIndex++;
+            }
+            while (bufferIndex < bufferSegments.size()) {
+                allSegments.add(bufferSegments.get(bufferIndex));
+                bufferIndex++;
+            }
+            return allSegments;
+        }
+
+        @Override
+        public long segmentIdForTimestamp(final long timestamp) {
+            return dbClient.segmentIdForTimestamp(timestamp);
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java
index 0e7983a009a..fb97ad3a6b7 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java
@@ -20,6 +20,12 @@ import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.nullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
@@ -409,6 +415,114 @@ public class RocksDBVersionedStoreTest {
         verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 8);
     }
 
+    @Test
+    public void shouldRestore() {
+        final List<DataRecord> records = new ArrayList<>();
+        records.add(new DataRecord("k", "vp20", SEGMENT_INTERVAL + 20));
+        records.add(new DataRecord("k", "vp10", SEGMENT_INTERVAL + 10));
+        records.add(new DataRecord("k", "vn10", SEGMENT_INTERVAL - 10));
+        records.add(new DataRecord("k", "vn2", SEGMENT_INTERVAL - 2));
+        records.add(new DataRecord("k", "vn1", SEGMENT_INTERVAL - 1));
+        records.add(new DataRecord("k", "vp1", SEGMENT_INTERVAL + 1));
+
+        store.restoreBatch(getChangelogRecords(records));
+
+        verifyGetValueFromStore("k", "vp20", SEGMENT_INTERVAL + 20);
+        verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL + 30, "vp20", 
SEGMENT_INTERVAL + 20);
+        verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL + 15, "vp10", 
SEGMENT_INTERVAL + 10);
+        verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL + 5, "vp1", 
SEGMENT_INTERVAL + 1);
+        verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL, "vn1", 
SEGMENT_INTERVAL - 1);
+        verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 1, "vn1", 
SEGMENT_INTERVAL - 1);
+        verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 2, "vn2", 
SEGMENT_INTERVAL - 2);
+        verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 5, "vn10", 
SEGMENT_INTERVAL - 10);
+    }
+
+    @Test
+    public void shouldRestoreWithNulls() {
+        final List<DataRecord> records = new ArrayList<>();
+        records.add(new DataRecord("k", null, SEGMENT_INTERVAL + 20));
+        records.add(new DataRecord("k", null, SEGMENT_INTERVAL - 1));
+        records.add(new DataRecord("k", null, SEGMENT_INTERVAL + 1));
+        records.add(new DataRecord("k", null, SEGMENT_INTERVAL - 10));
+        records.add(new DataRecord("k", null, SEGMENT_INTERVAL + 10));
+        records.add(new DataRecord("k", "vp5", SEGMENT_INTERVAL + 5));
+        records.add(new DataRecord("k", "vn5", SEGMENT_INTERVAL - 5));
+        records.add(new DataRecord("k", "vn6", SEGMENT_INTERVAL - 6));
+
+        store.restoreBatch(getChangelogRecords(records));
+
+        verifyGetNullFromStore("k");
+        verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL + 30);
+        verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL + 15);
+        verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL + 6, "vp5", 
SEGMENT_INTERVAL + 5);
+        verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL + 2);
+        verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL);
+        verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 1);
+        verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 5, "vn5", 
SEGMENT_INTERVAL - 5);
+        verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 6, "vn6", 
SEGMENT_INTERVAL - 6);
+        verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 8);
+    }
+
+    @Test
+    public void shouldRestoreWithNullsAndRepeatTimestamps() {
+        final List<DataRecord> records = new ArrayList<>();
+        records.add(new DataRecord("k", "to_be_replaced", SEGMENT_INTERVAL + 
20));
+        records.add(new DataRecord("k", null, SEGMENT_INTERVAL - 10));
+        records.add(new DataRecord("k", "to_be_replaced", SEGMENT_INTERVAL - 
10)); // replaces existing null with non-null, with timestamps spanning segments
+        records.add(new DataRecord("k", null, SEGMENT_INTERVAL - 10)); // 
replaces existing non-null with null
+        records.add(new DataRecord("k", "to_be_replaced", SEGMENT_INTERVAL - 
1));
+        records.add(new DataRecord("k", "to_be_replaced", SEGMENT_INTERVAL + 
1));
+        records.add(new DataRecord("k", null, SEGMENT_INTERVAL - 1)); // 
replaces existing non-null with null
+        records.add(new DataRecord("k", null, SEGMENT_INTERVAL + 1)); // 
replaces existing non-null with null, with timestamps spanning segments
+        records.add(new DataRecord("k", null, SEGMENT_INTERVAL + 10));
+        records.add(new DataRecord("k", null, SEGMENT_INTERVAL + 5));
+        records.add(new DataRecord("k", "vp5", SEGMENT_INTERVAL + 5)); // 
replaces existing null with non-null
+        records.add(new DataRecord("k", "to_be_replaced", SEGMENT_INTERVAL - 
5));
+        records.add(new DataRecord("k", "vn5", SEGMENT_INTERVAL - 5)); // 
replaces existing non-null with non-null
+        records.add(new DataRecord("k", null, SEGMENT_INTERVAL + 20)); // 
replaces existing non-null (latest value) with null
+        records.add(new DataRecord("k", null, SEGMENT_INTERVAL + 20)); // 
replaces existing null with null
+        records.add(new DataRecord("k", "vn6", SEGMENT_INTERVAL - 6));
+
+        store.restoreBatch(getChangelogRecords(records));
+
+        verifyGetNullFromStore("k");
+        verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL + 30);
+        verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL + 15);
+        verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL + 6, "vp5", 
SEGMENT_INTERVAL + 5);
+        verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL + 2);
+        verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL);
+        verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 1);
+        verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 5, "vn5", 
SEGMENT_INTERVAL - 5);
+        verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 6, "vn6", 
SEGMENT_INTERVAL - 6);
+        verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 8);
+    }
+
+    @Test
+    public void shouldRestoreMultipleBatches() {
+        final List<DataRecord> records = new ArrayList<>();
+        records.add(new DataRecord("k", null, SEGMENT_INTERVAL - 20));
+        records.add(new DataRecord("k", "vn10", SEGMENT_INTERVAL - 10));
+        records.add(new DataRecord("k", null, SEGMENT_INTERVAL - 1));
+
+        final List<DataRecord> moreRecords = new ArrayList<>();
+        moreRecords.add(new DataRecord("k", null, SEGMENT_INTERVAL + 1));
+        moreRecords.add(new DataRecord("k", "vp10", SEGMENT_INTERVAL + 10));
+        moreRecords.add(new DataRecord("k", null, SEGMENT_INTERVAL + 20));
+
+        store.restoreBatch(getChangelogRecords(records));
+        store.restoreBatch(getChangelogRecords(moreRecords));
+
+        verifyGetNullFromStore("k");
+        verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL + 30);
+        verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL + 15, "vp10", 
SEGMENT_INTERVAL + 10);
+        verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL + 5);
+        verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL + 2);
+        verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL);
+        verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 1);
+        verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 5, "vn10", 
SEGMENT_INTERVAL - 10);
+        verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 15);
+    }
+
     private void putToStore(final String key, final String value, final long 
timestamp) {
         store.put(
             new Bytes(STRING_SERIALIZER.serialize(null, key)),
@@ -464,4 +578,40 @@ public class RocksDBVersionedStoreTest {
             STRING_DESERIALIZER.deserialize(null, versionedRecord.value()),
             versionedRecord.timestamp());
     }
+
+    private static List<ConsumerRecord<byte[], byte[]>> 
getChangelogRecords(final List<DataRecord> data) {
+        final List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>();
+
+        for (final DataRecord d : data) {
+            final byte[] rawKey = STRING_SERIALIZER.serialize(null, d.key);
+            final byte[] rawValue = STRING_SERIALIZER.serialize(null, d.value);
+            records.add(new ConsumerRecord<>(
+                "",
+                0,
+                0L,
+                d.timestamp,
+                TimestampType.CREATE_TIME,
+                rawKey.length,
+                rawValue == null ? 0 : rawValue.length,
+                rawKey,
+                rawValue,
+                new RecordHeaders(),
+                Optional.empty()
+            ));
+        }
+
+        return records;
+    }
+
+    private static class DataRecord {
+        final String key;
+        final String value;
+        final long timestamp;
+
+        DataRecord(final String key, final String value, final long timestamp) 
{
+            this.key = key;
+            this.value = value;
+            this.timestamp = timestamp;
+        }
+    }
 }
\ No newline at end of file

Reply via email to