This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new df59cc1a015 KAFKA-14491: [20/N] Add public-facing methods for 
versioned stores (#13442)
df59cc1a015 is described below

commit df59cc1a0151accb82082baaa56543ea157830d2
Author: Victoria Xia <[email protected]>
AuthorDate: Wed Apr 5 12:27:53 2023 -0400

    KAFKA-14491: [20/N] Add public-facing methods for versioned stores (#13442)
    
    Until this PR, all the code added for KIP-889 for introducing versioned 
stores to Kafka Streams has been accessible from internal packages only. This 
PR exposes the stores via public Stores.java methods, and also updates the 
TopologyTestDriver.
    
    Reviewers: Matthias J. Sax <[email protected]>
---
 checkstyle/suppressions.xml                        |  2 +-
 .../internals/KeyValueStoreMaterializer.java       |  6 +-
 .../org/apache/kafka/streams/state/Stores.java     | 99 +++++++++++++++++++++-
 .../VersionedKeyValueStoreIntegrationTest.java     | 39 ++++-----
 .../internals/InternalTopologyBuilderTest.java     | 11 +--
 .../org/apache/kafka/streams/state/StoresTest.java | 52 ++++++++++++
 .../apache/kafka/streams/TopologyTestDriver.java   | 35 ++++++++
 .../kafka/streams/TopologyTestDriverTest.java      | 82 +++++++++++++++++-
 8 files changed, 284 insertions(+), 42 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 5c95c8d4284..6962d0a5c4a 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -236,7 +236,7 @@
               
files="(EosV2UpgradeIntegrationTest|KStreamKStreamJoinTest|StreamThreadTest|TaskManagerTest).java"/>
 
     <suppress checks="NPathComplexity"
-              
files="(EosV2UpgradeIntegrationTest|EosTestDriver|KStreamKStreamJoinTest|KTableKTableForeignKeyJoinIntegrationTest|RelationalSmokeTest|MockProcessorContextStateStoreTest).java"/>
+              
files="(EosV2UpgradeIntegrationTest|EosTestDriver|KStreamKStreamJoinTest|KTableKTableForeignKeyJoinIntegrationTest|RelationalSmokeTest|MockProcessorContextStateStoreTest|TopologyTestDriverTest).java"/>
 
     <suppress 
checks="(FinalLocalVariable|WhitespaceAround|LocalVariableName|ImportControl|AvoidStarImport)"
               files="Murmur3Test.java"/>
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
index 33792dafb71..468846c7dee 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
@@ -17,7 +17,6 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
@@ -61,11 +60,10 @@ public class KeyValueStoreMaterializer<K, V> {
 
         final StoreBuilder<?> builder;
         if (supplier instanceof VersionedBytesStoreSupplier) {
-            builder = new VersionedKeyValueStoreBuilder<>(
+            builder = Stores.versionedKeyValueStoreBuilder(
                 (VersionedBytesStoreSupplier) supplier,
                 materialized.keySerde(),
-                materialized.valueSerde(),
-                Time.SYSTEM);
+                materialized.valueSerde());
         } else {
             builder = Stores.timestampedKeyValueStoreBuilder(
                 supplier,
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java 
b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index 17fbbb46d35..ea6e4cd71d1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -26,10 +26,12 @@ import 
org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
 import org.apache.kafka.streams.state.internals.MemoryNavigableLRUCache;
 import 
org.apache.kafka.streams.state.internals.RocksDbKeyValueBytesStoreSupplier;
 import 
org.apache.kafka.streams.state.internals.RocksDbSessionBytesStoreSupplier;
+import 
org.apache.kafka.streams.state.internals.RocksDbVersionedKeyValueBytesStoreSupplier;
 import 
org.apache.kafka.streams.state.internals.RocksDbWindowBytesStoreSupplier;
 import org.apache.kafka.streams.state.internals.SessionStoreBuilder;
 import 
org.apache.kafka.streams.state.internals.TimestampedKeyValueStoreBuilder;
 import org.apache.kafka.streams.state.internals.TimestampedWindowStoreBuilder;
+import org.apache.kafka.streams.state.internals.VersionedKeyValueStoreBuilder;
 import org.apache.kafka.streams.state.internals.WindowStoreBuilder;
 
 import java.time.Duration;
@@ -81,8 +83,10 @@ public final class Stores {
      * Create a persistent {@link KeyValueBytesStoreSupplier}.
      * <p>
      * This store supplier can be passed into a {@link 
#keyValueStoreBuilder(KeyValueBytesStoreSupplier, Serde, Serde)}.
-     * If you want to create a {@link TimestampedKeyValueStore} you should use
-     * {@link #persistentTimestampedKeyValueStore(String)} to create a store 
supplier instead.
+     * If you want to create a {@link TimestampedKeyValueStore} or {@link 
VersionedKeyValueStore}
+     * you should use {@link #persistentTimestampedKeyValueStore(String)} or
+     * {@link #persistentVersionedKeyValueStore(String, Duration)}, 
respectively,
+     * to create a store supplier instead.
      *
      * @param name  name of the store (cannot be {@code null})
      * @return an instance of a {@link KeyValueBytesStoreSupplier} that can be 
used
@@ -98,8 +102,10 @@ public final class Stores {
      * <p>
      * This store supplier can be passed into a
      * {@link #timestampedKeyValueStoreBuilder(KeyValueBytesStoreSupplier, 
Serde, Serde)}.
-     * If you want to create a {@link KeyValueStore} you should use
-     * {@link #persistentKeyValueStore(String)} to create a store supplier 
instead.
+     * If you want to create a {@link KeyValueStore} or a {@link 
VersionedKeyValueStore}
+     * you should use {@link #persistentKeyValueStore(String)} or
+     * {@link #persistentVersionedKeyValueStore(String, Duration)}, 
respectively,
+     * to create a store supplier instead.
      *
      * @param name  name of the store (cannot be {@code null})
      * @return an instance of a {@link KeyValueBytesStoreSupplier} that can be 
used
@@ -110,6 +116,73 @@ public final class Stores {
         return new RocksDbKeyValueBytesStoreSupplier(name, true);
     }
 
+    /**
+     * Create a persistent versioned key-value store {@link 
VersionedBytesStoreSupplier}.
+     * <p>
+     * This store supplier can be passed into a
+     * {@link #versionedKeyValueStoreBuilder(VersionedBytesStoreSupplier, 
Serde, Serde)}.
+     *
+     * @param name             name of the store (cannot be {@code null})
+     * @param historyRetention length of time that old record versions are 
available for query
+     *                         (cannot be negative). If a timestamp bound 
provided to
+     *                         {@link VersionedKeyValueStore#get(Object, 
long)} is older than this
+     *                         specified history retention, then the get 
operation will not return data.
+     *                         This parameter also determines the "grace 
period" after which
+     *                         out-of-order writes will no longer be accepted.
+     * @return an instance of {@link VersionedBytesStoreSupplier}
+     * @throws IllegalArgumentException if {@code historyRetention} can't be 
represented as {@code long milliseconds}
+     */
+    public static VersionedBytesStoreSupplier 
persistentVersionedKeyValueStore(final String name,
+                                                                               
final Duration historyRetention) {
+        Objects.requireNonNull(name, "name cannot be null");
+        final String hrMsgPrefix = 
prepareMillisCheckFailMsgPrefix(historyRetention, "historyRetention");
+        final long historyRetentionMs = 
validateMillisecondDuration(historyRetention, hrMsgPrefix);
+        if (historyRetentionMs < 0L) {
+            throw new IllegalArgumentException("historyRetention cannot be 
negative");
+        }
+        return new RocksDbVersionedKeyValueBytesStoreSupplier(name, 
historyRetentionMs);
+    }
+
+    /**
+     * Create a persistent versioned key-value store {@link 
VersionedBytesStoreSupplier}.
+     * <p>
+     * This store supplier can be passed into a
+     * {@link #versionedKeyValueStoreBuilder(VersionedBytesStoreSupplier, 
Serde, Serde)}.
+     *
+     * @param name             name of the store (cannot be {@code null})
+     * @param historyRetention length of time that old record versions are 
available for query
+     *                         (cannot be negative). If a timestamp bound 
provided to
+     *                         {@link VersionedKeyValueStore#get(Object, 
long)} is older than this
+     *                         specified history retention, then the get 
operation will not return data.
+     *                         This parameter also determines the "grace 
period" after which
+     *                         out-of-order writes will no longer be accepted.
+     * @param segmentInterval  size of segments for storing old record 
versions (must be positive). Old record versions
+     *                         for the same key in a single segment are stored 
(updated and accessed) together.
+     *                         The only impact of this parameter is 
performance. If segments are large
+     *                         and a workload results in many record versions 
for the same key being collected
+     *                         in a single segment, performance may degrade as 
a result. On the other hand,
+     *                         historical reads (which access older segments) 
and out-of-order writes may
+     *                         slow down if there are too many segments.
+     * @return an instance of {@link VersionedBytesStoreSupplier}
+     * @throws IllegalArgumentException if {@code historyRetention} or {@code 
segmentInterval} can't be represented as {@code long milliseconds}
+     */
+    public static VersionedBytesStoreSupplier 
persistentVersionedKeyValueStore(final String name,
+                                                                               
final Duration historyRetention,
+                                                                               
final Duration segmentInterval) {
+        Objects.requireNonNull(name, "name cannot be null");
+        final String hrMsgPrefix = 
prepareMillisCheckFailMsgPrefix(historyRetention, "historyRetention");
+        final long historyRetentionMs = 
validateMillisecondDuration(historyRetention, hrMsgPrefix);
+        if (historyRetentionMs < 0L) {
+            throw new IllegalArgumentException("historyRetention cannot be 
negative");
+        }
+        final String siMsgPrefix = 
prepareMillisCheckFailMsgPrefix(segmentInterval, "segmentInterval");
+        final long segmentIntervalMs = 
validateMillisecondDuration(segmentInterval, siMsgPrefix);
+        if (segmentIntervalMs < 1L) {
+            throw new IllegalArgumentException("segmentInterval cannot be zero 
or negative");
+        }
+        return new RocksDbVersionedKeyValueBytesStoreSupplier(name, 
historyRetentionMs, segmentIntervalMs);
+    }
+
     /**
      * Create an in-memory {@link KeyValueBytesStoreSupplier}.
      * <p>
@@ -388,6 +461,24 @@ public final class Stores {
         return new TimestampedKeyValueStoreBuilder<>(supplier, keySerde, 
valueSerde, Time.SYSTEM);
     }
 
+    /**
+     * Creates a {@link StoreBuilder} that can be used to build a {@link 
VersionedKeyValueStore}.
+     *
+     * @param supplier   a {@link VersionedBytesStoreSupplier} (cannot be 
{@code null})
+     * @param keySerde   the key serde to use
+     * @param valueSerde the value serde to use; if the serialized bytes is 
{@code null} for put operations,
+     *                   it is treated as a deletion
+     * @param <K>        key type
+     * @param <V>        value type
+     * @return an instance of a {@link StoreBuilder} that can build a {@link 
VersionedKeyValueStore}
+     */
+    public static <K, V> StoreBuilder<VersionedKeyValueStore<K, V>> 
versionedKeyValueStoreBuilder(final VersionedBytesStoreSupplier supplier,
+                                                                               
                   final Serde<K> keySerde,
+                                                                               
                   final Serde<V> valueSerde) {
+        Objects.requireNonNull(supplier, "supplier cannot be null");
+        return new VersionedKeyValueStoreBuilder<>(supplier, keySerde, 
valueSerde, Time.SYSTEM);
+    }
+
     /**
      * Creates a {@link StoreBuilder} that can be used to build a {@link 
WindowStore}.
      * <p>
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest.java
index 9469534a09e..39d5d526649 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest.java
@@ -35,7 +35,6 @@ import 
org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
@@ -59,11 +58,10 @@ import org.apache.kafka.streams.query.RangeQuery;
 import org.apache.kafka.streams.query.StateQueryRequest;
 import org.apache.kafka.streams.query.StateQueryResult;
 import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.VersionedBytesStoreSupplier;
 import org.apache.kafka.streams.state.VersionedKeyValueStore;
 import org.apache.kafka.streams.state.VersionedRecord;
-import 
org.apache.kafka.streams.state.internals.RocksDbVersionedKeyValueBytesStoreSupplier;
-import org.apache.kafka.streams.state.internals.VersionedKeyValueStoreBuilder;
 import 
org.apache.kafka.streams.state.internals.VersionedKeyValueToBytesStoreAdapter;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.TestUtils;
@@ -131,11 +129,10 @@ public class VersionedKeyValueStoreIntegrationTest {
 
         streamsBuilder
             .addStateStore(
-                new VersionedKeyValueStoreBuilder<>(
-                    new RocksDbVersionedKeyValueBytesStoreSupplier(STORE_NAME, 
HISTORY_RETENTION),
+                Stores.versionedKeyValueStoreBuilder(
+                    Stores.persistentVersionedKeyValueStore(STORE_NAME, 
Duration.ofMillis(HISTORY_RETENTION)),
                     Serdes.Integer(),
-                    Serdes.String(),
-                    Time.SYSTEM
+                    Serdes.String()
                 )
             )
             .stream(inputStream, Consumed.with(Serdes.Integer(), 
Serdes.String()))
@@ -176,11 +173,10 @@ public class VersionedKeyValueStoreIntegrationTest {
 
         streamsBuilder
             .addStateStore(
-                new VersionedKeyValueStoreBuilder<>(
-                    new RocksDbVersionedKeyValueBytesStoreSupplier(STORE_NAME, 
HISTORY_RETENTION),
+                Stores.versionedKeyValueStoreBuilder(
+                    Stores.persistentVersionedKeyValueStore(STORE_NAME, 
Duration.ofMillis(HISTORY_RETENTION)),
                     Serdes.Integer(),
-                    Serdes.String(),
-                    Time.SYSTEM
+                    Serdes.String()
                 )
             )
             .stream(inputStream, Consumed.with(Serdes.Integer(), 
Serdes.String()))
@@ -216,11 +212,10 @@ public class VersionedKeyValueStoreIntegrationTest {
 
         streamsBuilder
             .addStateStore(
-                new VersionedKeyValueStoreBuilder<>(
-                    new RocksDbVersionedKeyValueBytesStoreSupplier(STORE_NAME, 
HISTORY_RETENTION),
+                Stores.versionedKeyValueStoreBuilder(
+                    Stores.persistentVersionedKeyValueStore(STORE_NAME, 
Duration.ofMillis(HISTORY_RETENTION)),
                     Serdes.Integer(),
-                    Serdes.String(),
-                    Time.SYSTEM
+                    Serdes.String()
                 )
             )
             .stream(inputStream, Consumed.with(Serdes.Integer(), 
Serdes.String()))
@@ -259,11 +254,10 @@ public class VersionedKeyValueStoreIntegrationTest {
 
         streamsBuilder
             .addStateStore(
-                new VersionedKeyValueStoreBuilder<>(
-                    new RocksDbVersionedKeyValueBytesStoreSupplier(STORE_NAME, 
HISTORY_RETENTION),
+                Stores.versionedKeyValueStoreBuilder(
+                    Stores.persistentVersionedKeyValueStore(STORE_NAME, 
Duration.ofMillis(HISTORY_RETENTION)),
                     Serdes.Integer(),
-                    Serdes.String(),
-                    Time.SYSTEM
+                    Serdes.String()
                 )
             )
             .stream(inputStream, Consumed.with(Serdes.Integer(), 
Serdes.String()))
@@ -298,11 +292,10 @@ public class VersionedKeyValueStoreIntegrationTest {
 
         streamsBuilder
             .addStateStore(
-                new VersionedKeyValueStoreBuilder<>(
+                Stores.versionedKeyValueStoreBuilder(
                     new CustomIQv2VersionedStoreSupplier(),
                     Serdes.Integer(),
-                    Serdes.String(),
-                    Time.SYSTEM
+                    Serdes.String()
                 )
             )
             .stream(inputStream, Consumed.with(Serdes.Integer(), 
Serdes.String()))
@@ -338,7 +331,7 @@ public class VersionedKeyValueStoreIntegrationTest {
                 globalTableTopic,
                 Consumed.with(Serdes.Integer(), Serdes.String()),
                 Materialized
-                    .<Integer, String>as(new 
RocksDbVersionedKeyValueBytesStoreSupplier(STORE_NAME, HISTORY_RETENTION))
+                    .<Integer, 
String>as(Stores.persistentVersionedKeyValueStore(STORE_NAME, 
Duration.ofMillis(HISTORY_RETENTION)))
                     .withKeySerde(Serdes.Integer())
                     .withValueSerde(Serdes.String()));
         streamsBuilder
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
index 4ff86cbe6b4..fbf0b91954e 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
@@ -16,11 +16,11 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import java.time.Duration;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.config.TopicConfig;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.TopologyDescription;
@@ -37,8 +37,6 @@ import org.apache.kafka.streams.TopologyConfig;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.Stores;
-import 
org.apache.kafka.streams.state.internals.RocksDbVersionedKeyValueBytesStoreSupplier;
-import org.apache.kafka.streams.state.internals.VersionedKeyValueStoreBuilder;
 import org.apache.kafka.test.MockApiProcessor;
 import org.apache.kafka.test.MockApiProcessorSupplier;
 import org.apache.kafka.test.MockKeyValueStoreBuilder;
@@ -909,11 +907,10 @@ public class InternalTopologyBuilderTest {
         builder.addSource(null, "source", null, null, null, "topic");
         builder.addProcessor("processor", new MockApiProcessorSupplier<>(), 
"source");
         builder.addStateStore(
-            new VersionedKeyValueStoreBuilder<>(
-                new RocksDbVersionedKeyValueBytesStoreSupplier("vstore", 
60_000L),
+            Stores.versionedKeyValueStoreBuilder(
+                Stores.persistentVersionedKeyValueStore("vstore", 
Duration.ofMillis(60_000L)),
                 Serdes.String(),
-                Serdes.String(),
-                new MockTime()
+                Serdes.String()
             ),
             "processor"
         );
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java 
b/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java
index 90f019aa7cc..98726f888fe 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.state;
 
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
 import org.apache.kafka.streams.state.internals.MemoryNavigableLRUCache;
@@ -32,6 +33,7 @@ import org.junit.Test;
 import static java.time.Duration.ZERO;
 import static java.time.Duration.ofMillis;
 import static org.hamcrest.CoreMatchers.allOf;
+import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.nullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.IsInstanceOf.instanceOf;
@@ -53,6 +55,33 @@ public class StoresTest {
         assertEquals("name cannot be null", e.getMessage());
     }
 
+    @Test
+    public void shouldThrowIfPersistentVersionedKeyValueStoreStoreNameIsNull() 
{
+        Exception e = assertThrows(NullPointerException.class, () -> 
Stores.persistentVersionedKeyValueStore(null, ZERO));
+        assertEquals("name cannot be null", e.getMessage());
+
+        e = assertThrows(NullPointerException.class, () -> 
Stores.persistentVersionedKeyValueStore(null, ZERO, ofMillis(1)));
+        assertEquals("name cannot be null", e.getMessage());
+    }
+
+    @Test
+    public void 
shouldThrowIfPersistentVersionedKeyValueStoreHistoryRetentionIsNegative() {
+        Exception e = assertThrows(IllegalArgumentException.class, () -> 
Stores.persistentVersionedKeyValueStore("anyName", ofMillis(-1)));
+        assertEquals("historyRetention cannot be negative", e.getMessage());
+
+        e = assertThrows(IllegalArgumentException.class, () -> 
Stores.persistentVersionedKeyValueStore("anyName", ofMillis(-1), ofMillis(1)));
+        assertEquals("historyRetention cannot be negative", e.getMessage());
+    }
+
+    @Test
+    public void 
shouldThrowIfPersistentVersionedKeyValueStoreSegmentIntervalIsZeroOrNegative() {
+        Exception e = assertThrows(IllegalArgumentException.class, () -> 
Stores.persistentVersionedKeyValueStore("anyName", ZERO, ZERO));
+        assertEquals("segmentInterval cannot be zero or negative", 
e.getMessage());
+
+        e = assertThrows(IllegalArgumentException.class, () -> 
Stores.persistentVersionedKeyValueStore("anyName", ZERO, ofMillis(-1)));
+        assertEquals("segmentInterval cannot be zero or negative", 
e.getMessage());
+    }
+
     @Test
     public void shouldThrowIfIMemoryKeyValueStoreStoreNameIsNull() {
         final Exception e = assertThrows(NullPointerException.class, () -> 
Stores.inMemoryKeyValueStore(null));
@@ -131,6 +160,12 @@ public class StoresTest {
         assertEquals("supplier cannot be null", e.getMessage());
     }
 
+    @Test
+    public void shouldThrowIfSupplierIsNullForVersionedKeyValueStoreBuilder() {
+        final Exception e = assertThrows(NullPointerException.class, () -> 
Stores.versionedKeyValueStoreBuilder(null, Serdes.ByteArray(), 
Serdes.ByteArray()));
+        assertEquals("supplier cannot be null", e.getMessage());
+    }
+
     @Test
     public void shouldThrowIfSupplierIsNullForSessionStoreBuilder() {
         final Exception e = assertThrows(NullPointerException.class, () -> 
Stores.sessionStoreBuilder(null, Serdes.ByteArray(), Serdes.ByteArray()));
@@ -159,6 +194,13 @@ public class StoresTest {
         assertThat(Stores.persistentTimestampedKeyValueStore("store").get(), 
instanceOf(RocksDBTimestampedStore.class));
     }
 
+    @Test
+    public void shouldCreateRocksDbVersionedStore() {
+        final KeyValueStore<Bytes, byte[]> store = 
Stores.persistentVersionedKeyValueStore("store", ofMillis(1)).get();
+        assertThat(store, instanceOf(VersionedBytesStore.class));
+        assertThat(store.persistent(), equalTo(true));
+    }
+
     @Test
     public void shouldCreateRocksDbWindowStore() {
         final WindowStore store = Stores.persistentWindowStore("store", 
ofMillis(1L), ofMillis(1L), false).get();
@@ -221,6 +263,16 @@ public class StoresTest {
         assertThat(((WrappedStateStore) store).wrapped(), 
instanceOf(TimestampedBytesStore.class));
     }
 
+    @Test
+    public void shouldBuildVersionedKeyValueStore() {
+        final VersionedKeyValueStore<String, String> store = 
Stores.versionedKeyValueStoreBuilder(
+            Stores.persistentVersionedKeyValueStore("name", ofMillis(1)),
+            Serdes.String(),
+            Serdes.String()
+        ).build();
+        assertThat(store, not(nullValue()));
+    }
+
     @Test
     public void shouldBuildWindowStore() {
         final WindowStore<String, String> store = Stores.windowStoreBuilder(
diff --git 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 96e8030d8fd..14123c36404 100644
--- 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -81,6 +81,7 @@ import org.apache.kafka.streams.state.SessionStore;
 import org.apache.kafka.streams.state.TimestampedKeyValueStore;
 import org.apache.kafka.streams.state.TimestampedWindowStore;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.VersionedKeyValueStore;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
 import org.apache.kafka.streams.state.internals.ReadOnlyKeyValueStoreFacade;
@@ -876,6 +877,7 @@ public class TopologyTestDriver implements Closeable {
      * @see #getStateStore(String)
      * @see #getKeyValueStore(String)
      * @see #getTimestampedKeyValueStore(String)
+     * @see #getVersionedKeyValueStore(String)
      * @see #getWindowStore(String)
      * @see #getTimestampedWindowStore(String)
      * @see #getSessionStore(String)
@@ -906,6 +908,7 @@ public class TopologyTestDriver implements Closeable {
      * @see #getAllStateStores()
      * @see #getKeyValueStore(String)
      * @see #getTimestampedKeyValueStore(String)
+     * @see #getVersionedKeyValueStore(String)
      * @see #getWindowStore(String)
      * @see #getTimestampedWindowStore(String)
      * @see #getSessionStore(String)
@@ -941,6 +944,10 @@ public class TopologyTestDriver implements Closeable {
     }
 
     private void throwIfBuiltInStore(final StateStore stateStore) {
+        if (stateStore instanceof VersionedKeyValueStore) {
+            throw new IllegalArgumentException("Store " + stateStore.name()
+                                                   + " is a versioned 
key-value store and should be accessed via `getVersionedKeyValueStore()`");
+        }
         if (stateStore instanceof TimestampedKeyValueStore) {
             throw new IllegalArgumentException("Store " + stateStore.name()
                                                    + " is a timestamped 
key-value store and should be accessed via `getTimestampedKeyValueStore()`");
@@ -980,6 +987,7 @@ public class TopologyTestDriver implements Closeable {
      * @see #getAllStateStores()
      * @see #getStateStore(String)
      * @see #getTimestampedKeyValueStore(String)
+     * @see #getVersionedKeyValueStore(String)
      * @see #getWindowStore(String)
      * @see #getTimestampedWindowStore(String)
      * @see #getSessionStore(String)
@@ -1006,6 +1014,7 @@ public class TopologyTestDriver implements Closeable {
      * @see #getAllStateStores()
      * @see #getStateStore(String)
      * @see #getKeyValueStore(String)
+     * @see #getVersionedKeyValueStore(String)
      * @see #getWindowStore(String)
      * @see #getTimestampedWindowStore(String)
      * @see #getSessionStore(String)
@@ -1016,6 +1025,29 @@ public class TopologyTestDriver implements Closeable {
         return store instanceof TimestampedKeyValueStore ? 
(TimestampedKeyValueStore<K, V>) store : null;
     }
 
+    /**
+     * Get the {@link VersionedKeyValueStore} with the given name.
+     * The store can be a "regular" or global store.
+     * <p>
+     * This is often useful in test cases to pre-populate the store before the 
test case instructs the topology to
+     * {@link TestInputTopic#pipeInput(TestRecord) process an input message}, 
and/or to check the store afterward.
+     *
+     * @param name the name of the store
+     * @return the key value store, or {@code null} if no {@link 
VersionedKeyValueStore} has been registered with the given name
+     * @see #getAllStateStores()
+     * @see #getStateStore(String)
+     * @see #getKeyValueStore(String)
+     * @see #getTimestampedKeyValueStore(String)
+     * @see #getWindowStore(String)
+     * @see #getTimestampedWindowStore(String)
+     * @see #getSessionStore(String)
+     */
+    @SuppressWarnings("unchecked")
+    public <K, V> VersionedKeyValueStore<K, V> getVersionedKeyValueStore(final 
String name) {
+        final StateStore store = getStateStore(name, false);
+        return store instanceof VersionedKeyValueStore ? 
(VersionedKeyValueStore<K, V>) store : null;
+    }
+
     /**
      * Get the {@link WindowStore} or {@link TimestampedWindowStore} with the 
given name.
      * The store can be a "regular" or global store.
@@ -1034,6 +1066,7 @@ public class TopologyTestDriver implements Closeable {
      * @see #getStateStore(String)
      * @see #getKeyValueStore(String)
      * @see #getTimestampedKeyValueStore(String)
+     * @see #getVersionedKeyValueStore(String)
      * @see #getTimestampedWindowStore(String)
      * @see #getSessionStore(String)
      */
@@ -1060,6 +1093,7 @@ public class TopologyTestDriver implements Closeable {
      * @see #getStateStore(String)
      * @see #getKeyValueStore(String)
      * @see #getTimestampedKeyValueStore(String)
+     * @see #getVersionedKeyValueStore(String)
      * @see #getWindowStore(String)
      * @see #getSessionStore(String)
      */
@@ -1082,6 +1116,7 @@ public class TopologyTestDriver implements Closeable {
      * @see #getStateStore(String)
      * @see #getKeyValueStore(String)
      * @see #getTimestampedKeyValueStore(String)
+     * @see #getVersionedKeyValueStore(String)
      * @see #getWindowStore(String)
      * @see #getTimestampedWindowStore(String)
      */
diff --git 
a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
 
b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
index a744bc12440..aa940f8539e 100644
--- 
a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
+++ 
b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
@@ -911,11 +911,13 @@ public abstract class TopologyTestDriverTest {
     private void shouldReturnCorrectStoreTypeOnly(final boolean persistent) {
         final String keyValueStoreName = "keyValueStore";
         final String timestampedKeyValueStoreName = "keyValueTimestampStore";
+        final String versionedKeyValueStoreName = "keyValueVersionedStore";
         final String windowStoreName = "windowStore";
         final String timestampedWindowStoreName = "windowTimestampStore";
         final String sessionStoreName = "sessionStore";
         final String globalKeyValueStoreName = "globalKeyValueStore";
         final String globalTimestampedKeyValueStoreName = 
"globalKeyValueTimestampStore";
+        final String globalVersionedKeyValueStoreName = 
"globalKeyValueVersionedStore";
 
         final Topology topology = setupSingleProcessorTopology();
         addStoresToTopology(
@@ -923,11 +925,13 @@ public abstract class TopologyTestDriverTest {
             persistent,
             keyValueStoreName,
             timestampedKeyValueStoreName,
+            versionedKeyValueStoreName,
             windowStoreName,
             timestampedWindowStoreName,
             sessionStoreName,
             globalKeyValueStoreName,
-            globalTimestampedKeyValueStoreName);
+            globalTimestampedKeyValueStoreName,
+            globalVersionedKeyValueStoreName);
 
 
         testDriver = new TopologyTestDriver(topology, config);
@@ -935,30 +939,44 @@ public abstract class TopologyTestDriverTest {
         // verify state stores
         assertNotNull(testDriver.getKeyValueStore(keyValueStoreName));
         assertNull(testDriver.getTimestampedKeyValueStore(keyValueStoreName));
+        assertNull(testDriver.getVersionedKeyValueStore(keyValueStoreName));
         assertNull(testDriver.getWindowStore(keyValueStoreName));
         assertNull(testDriver.getTimestampedWindowStore(keyValueStoreName));
         assertNull(testDriver.getSessionStore(keyValueStoreName));
 
         
assertNotNull(testDriver.getKeyValueStore(timestampedKeyValueStoreName));
         
assertNotNull(testDriver.getTimestampedKeyValueStore(timestampedKeyValueStoreName));
+        
assertNull(testDriver.getVersionedKeyValueStore(timestampedKeyValueStoreName));
         assertNull(testDriver.getWindowStore(timestampedKeyValueStoreName));
         
assertNull(testDriver.getTimestampedWindowStore(timestampedKeyValueStoreName));
         assertNull(testDriver.getSessionStore(timestampedKeyValueStoreName));
 
+        if (persistent) { // versioned stores do not offer an in-memory 
version yet, so nothing to test/verify unless persistent
+            
assertNull(testDriver.getKeyValueStore(versionedKeyValueStoreName));
+            
assertNull(testDriver.getTimestampedKeyValueStore(versionedKeyValueStoreName));
+            
assertNotNull(testDriver.getVersionedKeyValueStore(versionedKeyValueStoreName));
+            assertNull(testDriver.getWindowStore(versionedKeyValueStoreName));
+            
assertNull(testDriver.getTimestampedWindowStore(versionedKeyValueStoreName));
+            assertNull(testDriver.getSessionStore(versionedKeyValueStoreName));
+        }
+
         assertNull(testDriver.getKeyValueStore(windowStoreName));
         assertNull(testDriver.getTimestampedKeyValueStore(windowStoreName));
+        assertNull(testDriver.getVersionedKeyValueStore(windowStoreName));
         assertNotNull(testDriver.getWindowStore(windowStoreName));
         assertNull(testDriver.getTimestampedWindowStore(windowStoreName));
         assertNull(testDriver.getSessionStore(windowStoreName));
 
         assertNull(testDriver.getKeyValueStore(timestampedWindowStoreName));
         
assertNull(testDriver.getTimestampedKeyValueStore(timestampedWindowStoreName));
+        
assertNull(testDriver.getVersionedKeyValueStore(timestampedWindowStoreName));
         assertNotNull(testDriver.getWindowStore(timestampedWindowStoreName));
         
assertNotNull(testDriver.getTimestampedWindowStore(timestampedWindowStoreName));
         assertNull(testDriver.getSessionStore(timestampedWindowStoreName));
 
         assertNull(testDriver.getKeyValueStore(sessionStoreName));
         assertNull(testDriver.getTimestampedKeyValueStore(sessionStoreName));
+        assertNull(testDriver.getVersionedKeyValueStore(sessionStoreName));
         assertNull(testDriver.getWindowStore(sessionStoreName));
         assertNull(testDriver.getTimestampedWindowStore(sessionStoreName));
         assertNotNull(testDriver.getSessionStore(sessionStoreName));
@@ -966,15 +984,26 @@ public abstract class TopologyTestDriverTest {
         // verify global stores
         assertNotNull(testDriver.getKeyValueStore(globalKeyValueStoreName));
         
assertNull(testDriver.getTimestampedKeyValueStore(globalKeyValueStoreName));
+        
assertNull(testDriver.getVersionedKeyValueStore(globalKeyValueStoreName));
         assertNull(testDriver.getWindowStore(globalKeyValueStoreName));
         
assertNull(testDriver.getTimestampedWindowStore(globalKeyValueStoreName));
         assertNull(testDriver.getSessionStore(globalKeyValueStoreName));
 
         
assertNotNull(testDriver.getKeyValueStore(globalTimestampedKeyValueStoreName));
         
assertNotNull(testDriver.getTimestampedKeyValueStore(globalTimestampedKeyValueStoreName));
+        
assertNull(testDriver.getVersionedKeyValueStore(globalTimestampedKeyValueStoreName));
         
assertNull(testDriver.getWindowStore(globalTimestampedKeyValueStoreName));
         
assertNull(testDriver.getTimestampedWindowStore(globalTimestampedKeyValueStoreName));
         
assertNull(testDriver.getSessionStore(globalTimestampedKeyValueStoreName));
+
+        if (persistent) { // versioned stores do not offer an in-memory 
version yet, so nothing to test/verify unless persistent
+            
assertNull(testDriver.getKeyValueStore(globalVersionedKeyValueStoreName));
+            
assertNull(testDriver.getTimestampedKeyValueStore(globalVersionedKeyValueStoreName));
+            
assertNotNull(testDriver.getVersionedKeyValueStore(globalVersionedKeyValueStoreName));
+            
assertNull(testDriver.getWindowStore(globalVersionedKeyValueStoreName));
+            
assertNull(testDriver.getTimestampedWindowStore(globalVersionedKeyValueStoreName));
+            
assertNull(testDriver.getSessionStore(globalVersionedKeyValueStoreName));
+        }
     }
 
     @Test
@@ -990,11 +1019,13 @@ public abstract class TopologyTestDriverTest {
     private void shouldThrowIfBuiltInStoreIsAccessedWithUntypedMethod(final 
boolean persistent) {
         final String keyValueStoreName = "keyValueStore";
         final String timestampedKeyValueStoreName = "keyValueTimestampStore";
+        final String versionedKeyValueStoreName = "keyValueVersionedStore";
         final String windowStoreName = "windowStore";
         final String timestampedWindowStoreName = "windowTimestampStore";
         final String sessionStoreName = "sessionStore";
         final String globalKeyValueStoreName = "globalKeyValueStore";
         final String globalTimestampedKeyValueStoreName = 
"globalKeyValueTimestampStore";
+        final String globalVersionedKeyValueStoreName = 
"globalKeyValueVersionedStore";
 
         final Topology topology = setupSingleProcessorTopology();
         addStoresToTopology(
@@ -1002,11 +1033,13 @@ public abstract class TopologyTestDriverTest {
             persistent,
             keyValueStoreName,
             timestampedKeyValueStoreName,
+            versionedKeyValueStoreName,
             windowStoreName,
             timestampedWindowStoreName,
             sessionStoreName,
             globalKeyValueStoreName,
-            globalTimestampedKeyValueStoreName);
+            globalTimestampedKeyValueStoreName,
+            globalVersionedKeyValueStoreName);
 
 
         testDriver = new TopologyTestDriver(topology, config);
@@ -1029,6 +1062,15 @@ public abstract class TopologyTestDriverTest {
                 equalTo("Store " + timestampedKeyValueStoreName
                     + " is a timestamped key-value store and should be 
accessed via `getTimestampedKeyValueStore()`"));
         }
+        if (persistent) { // versioned stores do not offer an in-memory 
version yet, so nothing to test/verify unless persistent
+            final IllegalArgumentException e = assertThrows(
+                IllegalArgumentException.class,
+                () -> testDriver.getStateStore(versionedKeyValueStoreName));
+            assertThat(
+                e.getMessage(),
+                equalTo("Store " + versionedKeyValueStoreName
+                    + " is a versioned key-value store and should be accessed 
via `getVersionedKeyValueStore()`"));
+        }
         {
             final IllegalArgumentException e = assertThrows(
                 IllegalArgumentException.class,
@@ -1074,6 +1116,15 @@ public abstract class TopologyTestDriverTest {
                 equalTo("Store " + globalTimestampedKeyValueStoreName
                     + " is a timestamped key-value store and should be 
accessed via `getTimestampedKeyValueStore()`"));
         }
+        if (persistent) { // versioned stores do not offer an in-memory 
version yet, so nothing to test/verify unless persistent
+            final IllegalArgumentException e = assertThrows(
+                IllegalArgumentException.class,
+                () -> 
testDriver.getStateStore(globalVersionedKeyValueStoreName));
+            assertThat(
+                e.getMessage(),
+                equalTo("Store " + globalVersionedKeyValueStoreName
+                    + " is a versioned key-value store and should be accessed 
via `getVersionedKeyValueStore()`"));
+        }
     }
 
     final ProcessorSupplier<byte[], byte[], Void, Void> voidProcessorSupplier 
= () -> new Processor<byte[], byte[], Void, Void>() {
@@ -1086,11 +1137,13 @@ public abstract class TopologyTestDriverTest {
                                      final boolean persistent,
                                      final String keyValueStoreName,
                                      final String timestampedKeyValueStoreName,
+                                     final String versionedKeyValueStoreName,
                                      final String windowStoreName,
                                      final String timestampedWindowStoreName,
                                      final String sessionStoreName,
                                      final String globalKeyValueStoreName,
-                                     final String 
globalTimestampedKeyValueStoreName) {
+                                     final String 
globalTimestampedKeyValueStoreName,
+                                     final String 
globalVersionedKeyValueStoreName) {
 
         // add state stores
         topology.addStateStore(
@@ -1111,6 +1164,15 @@ public abstract class TopologyTestDriverTest {
                 Serdes.ByteArray()
             ),
             "processor");
+        if (persistent) { // versioned stores do not offer an in-memory 
version yet
+            topology.addStateStore(
+                Stores.versionedKeyValueStoreBuilder(
+                    
Stores.persistentVersionedKeyValueStore(versionedKeyValueStoreName, 
Duration.ofMillis(1000L)),
+                    Serdes.ByteArray(),
+                    Serdes.ByteArray()
+                ),
+                "processor");
+        }
         topology.addStateStore(
             Stores.windowStoreBuilder(
                 persistent ?
@@ -1177,6 +1239,20 @@ public abstract class TopologyTestDriverTest {
             "topicDummy2",
             "processorDummy2",
             voidProcessorSupplier);
+        if (persistent) { // versioned stores do not offer an in-memory 
version yet
+            topology.addGlobalStore(
+                Stores.versionedKeyValueStoreBuilder(
+                    
Stores.persistentVersionedKeyValueStore(globalVersionedKeyValueStoreName, 
Duration.ofMillis(1000L)),
+                    Serdes.ByteArray(),
+                    Serdes.ByteArray()
+                ).withLoggingDisabled(),
+                "sourceDummy3",
+                Serdes.ByteArray().deserializer(),
+                Serdes.ByteArray().deserializer(),
+                "topicDummy3",
+                "processorDummy3",
+                voidProcessorSupplier);
+        }
     }
 
     @Test


Reply via email to