This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new df59cc1a015 KAFKA-14491: [20/N] Add public-facing methods for
versioned stores (#13442)
df59cc1a015 is described below
commit df59cc1a0151accb82082baaa56543ea157830d2
Author: Victoria Xia <[email protected]>
AuthorDate: Wed Apr 5 12:27:53 2023 -0400
KAFKA-14491: [20/N] Add public-facing methods for versioned stores (#13442)
Until this PR, all the code added for KIP-889 for introducing versioned
stores to Kafka Streams has been accessible from internal packages only. This
PR exposes the stores via public Stores.java methods, and also updates the
TopologyTestDriver.
Reviewers: Matthias J. Sax <[email protected]>
---
checkstyle/suppressions.xml | 2 +-
.../internals/KeyValueStoreMaterializer.java | 6 +-
.../org/apache/kafka/streams/state/Stores.java | 99 +++++++++++++++++++++-
.../VersionedKeyValueStoreIntegrationTest.java | 39 ++++-----
.../internals/InternalTopologyBuilderTest.java | 11 +--
.../org/apache/kafka/streams/state/StoresTest.java | 52 ++++++++++++
.../apache/kafka/streams/TopologyTestDriver.java | 35 ++++++++
.../kafka/streams/TopologyTestDriverTest.java | 82 +++++++++++++++++-
8 files changed, 284 insertions(+), 42 deletions(-)
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 5c95c8d4284..6962d0a5c4a 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -236,7 +236,7 @@
files="(EosV2UpgradeIntegrationTest|KStreamKStreamJoinTest|StreamThreadTest|TaskManagerTest).java"/>
<suppress checks="NPathComplexity"
-
files="(EosV2UpgradeIntegrationTest|EosTestDriver|KStreamKStreamJoinTest|KTableKTableForeignKeyJoinIntegrationTest|RelationalSmokeTest|MockProcessorContextStateStoreTest).java"/>
+
files="(EosV2UpgradeIntegrationTest|EosTestDriver|KStreamKStreamJoinTest|KTableKTableForeignKeyJoinIntegrationTest|RelationalSmokeTest|MockProcessorContextStateStoreTest|TopologyTestDriverTest).java"/>
<suppress
checks="(FinalLocalVariable|WhitespaceAround|LocalVariableName|ImportControl|AvoidStarImport)"
files="Murmur3Test.java"/>
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
index 33792dafb71..468846c7dee 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
@@ -17,7 +17,6 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
@@ -61,11 +60,10 @@ public class KeyValueStoreMaterializer<K, V> {
final StoreBuilder<?> builder;
if (supplier instanceof VersionedBytesStoreSupplier) {
- builder = new VersionedKeyValueStoreBuilder<>(
+ builder = Stores.versionedKeyValueStoreBuilder(
(VersionedBytesStoreSupplier) supplier,
materialized.keySerde(),
- materialized.valueSerde(),
- Time.SYSTEM);
+ materialized.valueSerde());
} else {
builder = Stores.timestampedKeyValueStoreBuilder(
supplier,
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index 17fbbb46d35..ea6e4cd71d1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -26,10 +26,12 @@ import
org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
import org.apache.kafka.streams.state.internals.MemoryNavigableLRUCache;
import
org.apache.kafka.streams.state.internals.RocksDbKeyValueBytesStoreSupplier;
import
org.apache.kafka.streams.state.internals.RocksDbSessionBytesStoreSupplier;
+import
org.apache.kafka.streams.state.internals.RocksDbVersionedKeyValueBytesStoreSupplier;
import
org.apache.kafka.streams.state.internals.RocksDbWindowBytesStoreSupplier;
import org.apache.kafka.streams.state.internals.SessionStoreBuilder;
import
org.apache.kafka.streams.state.internals.TimestampedKeyValueStoreBuilder;
import org.apache.kafka.streams.state.internals.TimestampedWindowStoreBuilder;
+import org.apache.kafka.streams.state.internals.VersionedKeyValueStoreBuilder;
import org.apache.kafka.streams.state.internals.WindowStoreBuilder;
import java.time.Duration;
@@ -81,8 +83,10 @@ public final class Stores {
* Create a persistent {@link KeyValueBytesStoreSupplier}.
* <p>
* This store supplier can be passed into a {@link
#keyValueStoreBuilder(KeyValueBytesStoreSupplier, Serde, Serde)}.
- * If you want to create a {@link TimestampedKeyValueStore} you should use
- * {@link #persistentTimestampedKeyValueStore(String)} to create a store
supplier instead.
+ * If you want to create a {@link TimestampedKeyValueStore} or {@link
VersionedKeyValueStore}
+ * you should use {@link #persistentTimestampedKeyValueStore(String)} or
+ * {@link #persistentVersionedKeyValueStore(String, Duration)},
respectively,
+ * to create a store supplier instead.
*
* @param name name of the store (cannot be {@code null})
* @return an instance of a {@link KeyValueBytesStoreSupplier} that can be
used
@@ -98,8 +102,10 @@ public final class Stores {
* <p>
* This store supplier can be passed into a
* {@link #timestampedKeyValueStoreBuilder(KeyValueBytesStoreSupplier,
Serde, Serde)}.
- * If you want to create a {@link KeyValueStore} you should use
- * {@link #persistentKeyValueStore(String)} to create a store supplier
instead.
+ * If you want to create a {@link KeyValueStore} or a {@link
VersionedKeyValueStore}
+ * you should use {@link #persistentKeyValueStore(String)} or
+ * {@link #persistentVersionedKeyValueStore(String, Duration)},
respectively,
+ * to create a store supplier instead.
*
* @param name name of the store (cannot be {@code null})
* @return an instance of a {@link KeyValueBytesStoreSupplier} that can be
used
@@ -110,6 +116,73 @@ public final class Stores {
return new RocksDbKeyValueBytesStoreSupplier(name, true);
}
+ /**
+ * Create a persistent versioned key-value store {@link
VersionedBytesStoreSupplier}.
+ * <p>
+ * This store supplier can be passed into a
+ * {@link #versionedKeyValueStoreBuilder(VersionedBytesStoreSupplier,
Serde, Serde)}.
+ *
+ * @param name name of the store (cannot be {@code null})
+ * @param historyRetention length of time that old record versions are
available for query
+ * (cannot be negative). If a timestamp bound
provided to
+ * {@link VersionedKeyValueStore#get(Object,
long)} is older than this
+ * specified history retention, then the get
operation will not return data.
+ * This parameter also determines the "grace
period" after which
+ * out-of-order writes will no longer be accepted.
+ * @return an instance of {@link VersionedBytesStoreSupplier}
+ * @throws IllegalArgumentException if {@code historyRetention} can't be
represented as {@code long milliseconds}
+ */
+ public static VersionedBytesStoreSupplier
persistentVersionedKeyValueStore(final String name,
+
final Duration historyRetention) {
+ Objects.requireNonNull(name, "name cannot be null");
+ final String hrMsgPrefix =
prepareMillisCheckFailMsgPrefix(historyRetention, "historyRetention");
+ final long historyRetentionMs =
validateMillisecondDuration(historyRetention, hrMsgPrefix);
+ if (historyRetentionMs < 0L) {
+ throw new IllegalArgumentException("historyRetention cannot be
negative");
+ }
+ return new RocksDbVersionedKeyValueBytesStoreSupplier(name,
historyRetentionMs);
+ }
+
+ /**
+ * Create a persistent versioned key-value store {@link
VersionedBytesStoreSupplier}.
+ * <p>
+ * This store supplier can be passed into a
+ * {@link #versionedKeyValueStoreBuilder(VersionedBytesStoreSupplier,
Serde, Serde)}.
+ *
+ * @param name name of the store (cannot be {@code null})
+ * @param historyRetention length of time that old record versions are
available for query
+ * (cannot be negative). If a timestamp bound
provided to
+ * {@link VersionedKeyValueStore#get(Object,
long)} is older than this
+ * specified history retention, then the get
operation will not return data.
+ * This parameter also determines the "grace
period" after which
+ * out-of-order writes will no longer be accepted.
+ * @param segmentInterval size of segments for storing old record
versions (must be positive). Old record versions
+ * for the same key in a single segment are stored
(updated and accessed) together.
+ * The only impact of this parameter is
performance. If segments are large
+ * and a workload results in many record versions
for the same key being collected
+ * in a single segment, performance may degrade as
a result. On the other hand,
+ * historical reads (which access older segments)
and out-of-order writes may
+ * slow down if there are too many segments.
+ * @return an instance of {@link VersionedBytesStoreSupplier}
+ * @throws IllegalArgumentException if {@code historyRetention} or {@code
segmentInterval} can't be represented as {@code long milliseconds}
+ */
+ public static VersionedBytesStoreSupplier
persistentVersionedKeyValueStore(final String name,
+
final Duration historyRetention,
+
final Duration segmentInterval) {
+ Objects.requireNonNull(name, "name cannot be null");
+ final String hrMsgPrefix =
prepareMillisCheckFailMsgPrefix(historyRetention, "historyRetention");
+ final long historyRetentionMs =
validateMillisecondDuration(historyRetention, hrMsgPrefix);
+ if (historyRetentionMs < 0L) {
+ throw new IllegalArgumentException("historyRetention cannot be
negative");
+ }
+ final String siMsgPrefix =
prepareMillisCheckFailMsgPrefix(segmentInterval, "segmentInterval");
+ final long segmentIntervalMs =
validateMillisecondDuration(segmentInterval, siMsgPrefix);
+ if (segmentIntervalMs < 1L) {
+ throw new IllegalArgumentException("segmentInterval cannot be zero
or negative");
+ }
+ return new RocksDbVersionedKeyValueBytesStoreSupplier(name,
historyRetentionMs, segmentIntervalMs);
+ }
+
/**
* Create an in-memory {@link KeyValueBytesStoreSupplier}.
* <p>
@@ -388,6 +461,24 @@ public final class Stores {
return new TimestampedKeyValueStoreBuilder<>(supplier, keySerde,
valueSerde, Time.SYSTEM);
}
+ /**
+ * Creates a {@link StoreBuilder} that can be used to build a {@link
VersionedKeyValueStore}.
+ *
+ * @param supplier a {@link VersionedBytesStoreSupplier} (cannot be
{@code null})
+ * @param keySerde the key serde to use
+ * @param valueSerde the value serde to use; if the serialized bytes is
{@code null} for put operations,
+ * it is treated as a deletion
+ * @param <K> key type
+ * @param <V> value type
+ * @return an instance of a {@link StoreBuilder} that can build a {@link
VersionedKeyValueStore}
+ */
+ public static <K, V> StoreBuilder<VersionedKeyValueStore<K, V>>
versionedKeyValueStoreBuilder(final VersionedBytesStoreSupplier supplier,
+
final Serde<K> keySerde,
+
final Serde<V> valueSerde) {
+ Objects.requireNonNull(supplier, "supplier cannot be null");
+ return new VersionedKeyValueStoreBuilder<>(supplier, keySerde,
valueSerde, Time.SYSTEM);
+ }
+
/**
* Creates a {@link StoreBuilder} that can be used to build a {@link
WindowStore}.
* <p>
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest.java
index 9469534a09e..39d5d526649 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest.java
@@ -35,7 +35,6 @@ import
org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
@@ -59,11 +58,10 @@ import org.apache.kafka.streams.query.RangeQuery;
import org.apache.kafka.streams.query.StateQueryRequest;
import org.apache.kafka.streams.query.StateQueryResult;
import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.VersionedBytesStoreSupplier;
import org.apache.kafka.streams.state.VersionedKeyValueStore;
import org.apache.kafka.streams.state.VersionedRecord;
-import
org.apache.kafka.streams.state.internals.RocksDbVersionedKeyValueBytesStoreSupplier;
-import org.apache.kafka.streams.state.internals.VersionedKeyValueStoreBuilder;
import
org.apache.kafka.streams.state.internals.VersionedKeyValueToBytesStoreAdapter;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
@@ -131,11 +129,10 @@ public class VersionedKeyValueStoreIntegrationTest {
streamsBuilder
.addStateStore(
- new VersionedKeyValueStoreBuilder<>(
- new RocksDbVersionedKeyValueBytesStoreSupplier(STORE_NAME,
HISTORY_RETENTION),
+ Stores.versionedKeyValueStoreBuilder(
+ Stores.persistentVersionedKeyValueStore(STORE_NAME,
Duration.ofMillis(HISTORY_RETENTION)),
Serdes.Integer(),
- Serdes.String(),
- Time.SYSTEM
+ Serdes.String()
)
)
.stream(inputStream, Consumed.with(Serdes.Integer(),
Serdes.String()))
@@ -176,11 +173,10 @@ public class VersionedKeyValueStoreIntegrationTest {
streamsBuilder
.addStateStore(
- new VersionedKeyValueStoreBuilder<>(
- new RocksDbVersionedKeyValueBytesStoreSupplier(STORE_NAME,
HISTORY_RETENTION),
+ Stores.versionedKeyValueStoreBuilder(
+ Stores.persistentVersionedKeyValueStore(STORE_NAME,
Duration.ofMillis(HISTORY_RETENTION)),
Serdes.Integer(),
- Serdes.String(),
- Time.SYSTEM
+ Serdes.String()
)
)
.stream(inputStream, Consumed.with(Serdes.Integer(),
Serdes.String()))
@@ -216,11 +212,10 @@ public class VersionedKeyValueStoreIntegrationTest {
streamsBuilder
.addStateStore(
- new VersionedKeyValueStoreBuilder<>(
- new RocksDbVersionedKeyValueBytesStoreSupplier(STORE_NAME,
HISTORY_RETENTION),
+ Stores.versionedKeyValueStoreBuilder(
+ Stores.persistentVersionedKeyValueStore(STORE_NAME,
Duration.ofMillis(HISTORY_RETENTION)),
Serdes.Integer(),
- Serdes.String(),
- Time.SYSTEM
+ Serdes.String()
)
)
.stream(inputStream, Consumed.with(Serdes.Integer(),
Serdes.String()))
@@ -259,11 +254,10 @@ public class VersionedKeyValueStoreIntegrationTest {
streamsBuilder
.addStateStore(
- new VersionedKeyValueStoreBuilder<>(
- new RocksDbVersionedKeyValueBytesStoreSupplier(STORE_NAME,
HISTORY_RETENTION),
+ Stores.versionedKeyValueStoreBuilder(
+ Stores.persistentVersionedKeyValueStore(STORE_NAME,
Duration.ofMillis(HISTORY_RETENTION)),
Serdes.Integer(),
- Serdes.String(),
- Time.SYSTEM
+ Serdes.String()
)
)
.stream(inputStream, Consumed.with(Serdes.Integer(),
Serdes.String()))
@@ -298,11 +292,10 @@ public class VersionedKeyValueStoreIntegrationTest {
streamsBuilder
.addStateStore(
- new VersionedKeyValueStoreBuilder<>(
+ Stores.versionedKeyValueStoreBuilder(
new CustomIQv2VersionedStoreSupplier(),
Serdes.Integer(),
- Serdes.String(),
- Time.SYSTEM
+ Serdes.String()
)
)
.stream(inputStream, Consumed.with(Serdes.Integer(),
Serdes.String()))
@@ -338,7 +331,7 @@ public class VersionedKeyValueStoreIntegrationTest {
globalTableTopic,
Consumed.with(Serdes.Integer(), Serdes.String()),
Materialized
- .<Integer, String>as(new
RocksDbVersionedKeyValueBytesStoreSupplier(STORE_NAME, HISTORY_RETENTION))
+ .<Integer,
String>as(Stores.persistentVersionedKeyValueStore(STORE_NAME,
Duration.ofMillis(HISTORY_RETENTION)))
.withKeySerde(Serdes.Integer())
.withValueSerde(Serdes.String()));
streamsBuilder
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
index 4ff86cbe6b4..fbf0b91954e 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
@@ -16,11 +16,11 @@
*/
package org.apache.kafka.streams.processor.internals;
+import java.time.Duration;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyDescription;
@@ -37,8 +37,6 @@ import org.apache.kafka.streams.TopologyConfig;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
-import
org.apache.kafka.streams.state.internals.RocksDbVersionedKeyValueBytesStoreSupplier;
-import org.apache.kafka.streams.state.internals.VersionedKeyValueStoreBuilder;
import org.apache.kafka.test.MockApiProcessor;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockKeyValueStoreBuilder;
@@ -909,11 +907,10 @@ public class InternalTopologyBuilderTest {
builder.addSource(null, "source", null, null, null, "topic");
builder.addProcessor("processor", new MockApiProcessorSupplier<>(),
"source");
builder.addStateStore(
- new VersionedKeyValueStoreBuilder<>(
- new RocksDbVersionedKeyValueBytesStoreSupplier("vstore",
60_000L),
+ Stores.versionedKeyValueStoreBuilder(
+ Stores.persistentVersionedKeyValueStore("vstore",
Duration.ofMillis(60_000L)),
Serdes.String(),
- Serdes.String(),
- new MockTime()
+ Serdes.String()
),
"processor"
);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java
index 90f019aa7cc..98726f888fe 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.state;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
import org.apache.kafka.streams.state.internals.MemoryNavigableLRUCache;
@@ -32,6 +33,7 @@ import org.junit.Test;
import static java.time.Duration.ZERO;
import static java.time.Duration.ofMillis;
import static org.hamcrest.CoreMatchers.allOf;
+import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
@@ -53,6 +55,33 @@ public class StoresTest {
assertEquals("name cannot be null", e.getMessage());
}
+ @Test
+ public void shouldThrowIfPersistentVersionedKeyValueStoreStoreNameIsNull()
{
+ Exception e = assertThrows(NullPointerException.class, () ->
Stores.persistentVersionedKeyValueStore(null, ZERO));
+ assertEquals("name cannot be null", e.getMessage());
+
+ e = assertThrows(NullPointerException.class, () ->
Stores.persistentVersionedKeyValueStore(null, ZERO, ofMillis(1)));
+ assertEquals("name cannot be null", e.getMessage());
+ }
+
+ @Test
+ public void
shouldThrowIfPersistentVersionedKeyValueStoreHistoryRetentionIsNegative() {
+ Exception e = assertThrows(IllegalArgumentException.class, () ->
Stores.persistentVersionedKeyValueStore("anyName", ofMillis(-1)));
+ assertEquals("historyRetention cannot be negative", e.getMessage());
+
+ e = assertThrows(IllegalArgumentException.class, () ->
Stores.persistentVersionedKeyValueStore("anyName", ofMillis(-1), ofMillis(1)));
+ assertEquals("historyRetention cannot be negative", e.getMessage());
+ }
+
+ @Test
+ public void
shouldThrowIfPersistentVersionedKeyValueStoreSegmentIntervalIsZeroOrNegative() {
+ Exception e = assertThrows(IllegalArgumentException.class, () ->
Stores.persistentVersionedKeyValueStore("anyName", ZERO, ZERO));
+ assertEquals("segmentInterval cannot be zero or negative",
e.getMessage());
+
+ e = assertThrows(IllegalArgumentException.class, () ->
Stores.persistentVersionedKeyValueStore("anyName", ZERO, ofMillis(-1)));
+ assertEquals("segmentInterval cannot be zero or negative",
e.getMessage());
+ }
+
@Test
public void shouldThrowIfIMemoryKeyValueStoreStoreNameIsNull() {
final Exception e = assertThrows(NullPointerException.class, () ->
Stores.inMemoryKeyValueStore(null));
@@ -131,6 +160,12 @@ public class StoresTest {
assertEquals("supplier cannot be null", e.getMessage());
}
+ @Test
+ public void shouldThrowIfSupplierIsNullForVersionedKeyValueStoreBuilder() {
+ final Exception e = assertThrows(NullPointerException.class, () ->
Stores.versionedKeyValueStoreBuilder(null, Serdes.ByteArray(),
Serdes.ByteArray()));
+ assertEquals("supplier cannot be null", e.getMessage());
+ }
+
@Test
public void shouldThrowIfSupplierIsNullForSessionStoreBuilder() {
final Exception e = assertThrows(NullPointerException.class, () ->
Stores.sessionStoreBuilder(null, Serdes.ByteArray(), Serdes.ByteArray()));
@@ -159,6 +194,13 @@ public class StoresTest {
assertThat(Stores.persistentTimestampedKeyValueStore("store").get(),
instanceOf(RocksDBTimestampedStore.class));
}
+ @Test
+ public void shouldCreateRocksDbVersionedStore() {
+ final KeyValueStore<Bytes, byte[]> store =
Stores.persistentVersionedKeyValueStore("store", ofMillis(1)).get();
+ assertThat(store, instanceOf(VersionedBytesStore.class));
+ assertThat(store.persistent(), equalTo(true));
+ }
+
@Test
public void shouldCreateRocksDbWindowStore() {
final WindowStore store = Stores.persistentWindowStore("store",
ofMillis(1L), ofMillis(1L), false).get();
@@ -221,6 +263,16 @@ public class StoresTest {
assertThat(((WrappedStateStore) store).wrapped(),
instanceOf(TimestampedBytesStore.class));
}
+ @Test
+ public void shouldBuildVersionedKeyValueStore() {
+ final VersionedKeyValueStore<String, String> store =
Stores.versionedKeyValueStoreBuilder(
+ Stores.persistentVersionedKeyValueStore("name", ofMillis(1)),
+ Serdes.String(),
+ Serdes.String()
+ ).build();
+ assertThat(store, not(nullValue()));
+ }
+
@Test
public void shouldBuildWindowStore() {
final WindowStore<String, String> store = Stores.windowStoreBuilder(
diff --git
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 96e8030d8fd..14123c36404 100644
---
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -81,6 +81,7 @@ import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.TimestampedWindowStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.VersionedKeyValueStore;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.streams.state.internals.ReadOnlyKeyValueStoreFacade;
@@ -876,6 +877,7 @@ public class TopologyTestDriver implements Closeable {
* @see #getStateStore(String)
* @see #getKeyValueStore(String)
* @see #getTimestampedKeyValueStore(String)
+ * @see #getVersionedKeyValueStore(String)
* @see #getWindowStore(String)
* @see #getTimestampedWindowStore(String)
* @see #getSessionStore(String)
@@ -906,6 +908,7 @@ public class TopologyTestDriver implements Closeable {
* @see #getAllStateStores()
* @see #getKeyValueStore(String)
* @see #getTimestampedKeyValueStore(String)
+ * @see #getVersionedKeyValueStore(String)
* @see #getWindowStore(String)
* @see #getTimestampedWindowStore(String)
* @see #getSessionStore(String)
@@ -941,6 +944,10 @@ public class TopologyTestDriver implements Closeable {
}
private void throwIfBuiltInStore(final StateStore stateStore) {
+ if (stateStore instanceof VersionedKeyValueStore) {
+ throw new IllegalArgumentException("Store " + stateStore.name()
+ + " is a versioned
key-value store and should be accessed via `getVersionedKeyValueStore()`");
+ }
if (stateStore instanceof TimestampedKeyValueStore) {
throw new IllegalArgumentException("Store " + stateStore.name()
+ " is a timestamped
key-value store and should be accessed via `getTimestampedKeyValueStore()`");
@@ -980,6 +987,7 @@ public class TopologyTestDriver implements Closeable {
* @see #getAllStateStores()
* @see #getStateStore(String)
* @see #getTimestampedKeyValueStore(String)
+ * @see #getVersionedKeyValueStore(String)
* @see #getWindowStore(String)
* @see #getTimestampedWindowStore(String)
* @see #getSessionStore(String)
@@ -1006,6 +1014,7 @@ public class TopologyTestDriver implements Closeable {
* @see #getAllStateStores()
* @see #getStateStore(String)
* @see #getKeyValueStore(String)
+ * @see #getVersionedKeyValueStore(String)
* @see #getWindowStore(String)
* @see #getTimestampedWindowStore(String)
* @see #getSessionStore(String)
@@ -1016,6 +1025,29 @@ public class TopologyTestDriver implements Closeable {
return store instanceof TimestampedKeyValueStore ?
(TimestampedKeyValueStore<K, V>) store : null;
}
+ /**
+ * Get the {@link VersionedKeyValueStore} with the given name.
+ * The store can be a "regular" or global store.
+ * <p>
+ * This is often useful in test cases to pre-populate the store before the
test case instructs the topology to
+ * {@link TestInputTopic#pipeInput(TestRecord) process an input message},
and/or to check the store afterward.
+ *
+ * @param name the name of the store
+ * @return the key value store, or {@code null} if no {@link
VersionedKeyValueStore} has been registered with the given name
+ * @see #getAllStateStores()
+ * @see #getStateStore(String)
+ * @see #getKeyValueStore(String)
+ * @see #getTimestampedKeyValueStore(String)
+ * @see #getWindowStore(String)
+ * @see #getTimestampedWindowStore(String)
+ * @see #getSessionStore(String)
+ */
+ @SuppressWarnings("unchecked")
+ public <K, V> VersionedKeyValueStore<K, V> getVersionedKeyValueStore(final
String name) {
+ final StateStore store = getStateStore(name, false);
+ return store instanceof VersionedKeyValueStore ?
(VersionedKeyValueStore<K, V>) store : null;
+ }
+
/**
* Get the {@link WindowStore} or {@link TimestampedWindowStore} with the
given name.
* The store can be a "regular" or global store.
@@ -1034,6 +1066,7 @@ public class TopologyTestDriver implements Closeable {
* @see #getStateStore(String)
* @see #getKeyValueStore(String)
* @see #getTimestampedKeyValueStore(String)
+ * @see #getVersionedKeyValueStore(String)
* @see #getTimestampedWindowStore(String)
* @see #getSessionStore(String)
*/
@@ -1060,6 +1093,7 @@ public class TopologyTestDriver implements Closeable {
* @see #getStateStore(String)
* @see #getKeyValueStore(String)
* @see #getTimestampedKeyValueStore(String)
+ * @see #getVersionedKeyValueStore(String)
* @see #getWindowStore(String)
* @see #getSessionStore(String)
*/
@@ -1082,6 +1116,7 @@ public class TopologyTestDriver implements Closeable {
* @see #getStateStore(String)
* @see #getKeyValueStore(String)
* @see #getTimestampedKeyValueStore(String)
+ * @see #getVersionedKeyValueStore(String)
* @see #getWindowStore(String)
* @see #getTimestampedWindowStore(String)
*/
diff --git
a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
index a744bc12440..aa940f8539e 100644
---
a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
+++
b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
@@ -911,11 +911,13 @@ public abstract class TopologyTestDriverTest {
private void shouldReturnCorrectStoreTypeOnly(final boolean persistent) {
final String keyValueStoreName = "keyValueStore";
final String timestampedKeyValueStoreName = "keyValueTimestampStore";
+ final String versionedKeyValueStoreName = "keyValueVersionedStore";
final String windowStoreName = "windowStore";
final String timestampedWindowStoreName = "windowTimestampStore";
final String sessionStoreName = "sessionStore";
final String globalKeyValueStoreName = "globalKeyValueStore";
final String globalTimestampedKeyValueStoreName =
"globalKeyValueTimestampStore";
+ final String globalVersionedKeyValueStoreName =
"globalKeyValueVersionedStore";
final Topology topology = setupSingleProcessorTopology();
addStoresToTopology(
@@ -923,11 +925,13 @@ public abstract class TopologyTestDriverTest {
persistent,
keyValueStoreName,
timestampedKeyValueStoreName,
+ versionedKeyValueStoreName,
windowStoreName,
timestampedWindowStoreName,
sessionStoreName,
globalKeyValueStoreName,
- globalTimestampedKeyValueStoreName);
+ globalTimestampedKeyValueStoreName,
+ globalVersionedKeyValueStoreName);
testDriver = new TopologyTestDriver(topology, config);
@@ -935,30 +939,44 @@ public abstract class TopologyTestDriverTest {
// verify state stores
assertNotNull(testDriver.getKeyValueStore(keyValueStoreName));
assertNull(testDriver.getTimestampedKeyValueStore(keyValueStoreName));
+ assertNull(testDriver.getVersionedKeyValueStore(keyValueStoreName));
assertNull(testDriver.getWindowStore(keyValueStoreName));
assertNull(testDriver.getTimestampedWindowStore(keyValueStoreName));
assertNull(testDriver.getSessionStore(keyValueStoreName));
assertNotNull(testDriver.getKeyValueStore(timestampedKeyValueStoreName));
assertNotNull(testDriver.getTimestampedKeyValueStore(timestampedKeyValueStoreName));
+
assertNull(testDriver.getVersionedKeyValueStore(timestampedKeyValueStoreName));
assertNull(testDriver.getWindowStore(timestampedKeyValueStoreName));
assertNull(testDriver.getTimestampedWindowStore(timestampedKeyValueStoreName));
assertNull(testDriver.getSessionStore(timestampedKeyValueStoreName));
+ if (persistent) { // versioned stores do not offer an in-memory
version yet, so nothing to test/verify unless persistent
+
assertNull(testDriver.getKeyValueStore(versionedKeyValueStoreName));
+
assertNull(testDriver.getTimestampedKeyValueStore(versionedKeyValueStoreName));
+
assertNotNull(testDriver.getVersionedKeyValueStore(versionedKeyValueStoreName));
+ assertNull(testDriver.getWindowStore(versionedKeyValueStoreName));
+
assertNull(testDriver.getTimestampedWindowStore(versionedKeyValueStoreName));
+ assertNull(testDriver.getSessionStore(versionedKeyValueStoreName));
+ }
+
assertNull(testDriver.getKeyValueStore(windowStoreName));
assertNull(testDriver.getTimestampedKeyValueStore(windowStoreName));
+ assertNull(testDriver.getVersionedKeyValueStore(windowStoreName));
assertNotNull(testDriver.getWindowStore(windowStoreName));
assertNull(testDriver.getTimestampedWindowStore(windowStoreName));
assertNull(testDriver.getSessionStore(windowStoreName));
assertNull(testDriver.getKeyValueStore(timestampedWindowStoreName));
assertNull(testDriver.getTimestampedKeyValueStore(timestampedWindowStoreName));
+
assertNull(testDriver.getVersionedKeyValueStore(timestampedWindowStoreName));
assertNotNull(testDriver.getWindowStore(timestampedWindowStoreName));
assertNotNull(testDriver.getTimestampedWindowStore(timestampedWindowStoreName));
assertNull(testDriver.getSessionStore(timestampedWindowStoreName));
assertNull(testDriver.getKeyValueStore(sessionStoreName));
assertNull(testDriver.getTimestampedKeyValueStore(sessionStoreName));
+ assertNull(testDriver.getVersionedKeyValueStore(sessionStoreName));
assertNull(testDriver.getWindowStore(sessionStoreName));
assertNull(testDriver.getTimestampedWindowStore(sessionStoreName));
assertNotNull(testDriver.getSessionStore(sessionStoreName));
@@ -966,15 +984,26 @@ public abstract class TopologyTestDriverTest {
// verify global stores
assertNotNull(testDriver.getKeyValueStore(globalKeyValueStoreName));
assertNull(testDriver.getTimestampedKeyValueStore(globalKeyValueStoreName));
+
assertNull(testDriver.getVersionedKeyValueStore(globalKeyValueStoreName));
assertNull(testDriver.getWindowStore(globalKeyValueStoreName));
assertNull(testDriver.getTimestampedWindowStore(globalKeyValueStoreName));
assertNull(testDriver.getSessionStore(globalKeyValueStoreName));
assertNotNull(testDriver.getKeyValueStore(globalTimestampedKeyValueStoreName));
assertNotNull(testDriver.getTimestampedKeyValueStore(globalTimestampedKeyValueStoreName));
+
assertNull(testDriver.getVersionedKeyValueStore(globalTimestampedKeyValueStoreName));
assertNull(testDriver.getWindowStore(globalTimestampedKeyValueStoreName));
assertNull(testDriver.getTimestampedWindowStore(globalTimestampedKeyValueStoreName));
assertNull(testDriver.getSessionStore(globalTimestampedKeyValueStoreName));
+
+ if (persistent) { // versioned stores do not offer an in-memory
version yet, so nothing to test/verify unless persistent
+
assertNull(testDriver.getKeyValueStore(globalVersionedKeyValueStoreName));
+
assertNull(testDriver.getTimestampedKeyValueStore(globalVersionedKeyValueStoreName));
+
assertNotNull(testDriver.getVersionedKeyValueStore(globalVersionedKeyValueStoreName));
+
assertNull(testDriver.getWindowStore(globalVersionedKeyValueStoreName));
+
assertNull(testDriver.getTimestampedWindowStore(globalVersionedKeyValueStoreName));
+
assertNull(testDriver.getSessionStore(globalVersionedKeyValueStoreName));
+ }
}
@Test
@@ -990,11 +1019,13 @@ public abstract class TopologyTestDriverTest {
private void shouldThrowIfBuiltInStoreIsAccessedWithUntypedMethod(final
boolean persistent) {
final String keyValueStoreName = "keyValueStore";
final String timestampedKeyValueStoreName = "keyValueTimestampStore";
+ final String versionedKeyValueStoreName = "keyValueVersionedStore";
final String windowStoreName = "windowStore";
final String timestampedWindowStoreName = "windowTimestampStore";
final String sessionStoreName = "sessionStore";
final String globalKeyValueStoreName = "globalKeyValueStore";
final String globalTimestampedKeyValueStoreName =
"globalKeyValueTimestampStore";
+ final String globalVersionedKeyValueStoreName =
"globalKeyValueVersionedStore";
final Topology topology = setupSingleProcessorTopology();
addStoresToTopology(
@@ -1002,11 +1033,13 @@ public abstract class TopologyTestDriverTest {
persistent,
keyValueStoreName,
timestampedKeyValueStoreName,
+ versionedKeyValueStoreName,
windowStoreName,
timestampedWindowStoreName,
sessionStoreName,
globalKeyValueStoreName,
- globalTimestampedKeyValueStoreName);
+ globalTimestampedKeyValueStoreName,
+ globalVersionedKeyValueStoreName);
testDriver = new TopologyTestDriver(topology, config);
@@ -1029,6 +1062,15 @@ public abstract class TopologyTestDriverTest {
equalTo("Store " + timestampedKeyValueStoreName
+ " is a timestamped key-value store and should be
accessed via `getTimestampedKeyValueStore()`"));
}
+ if (persistent) { // versioned stores do not offer an in-memory
version yet, so nothing to test/verify unless persistent
+ final IllegalArgumentException e = assertThrows(
+ IllegalArgumentException.class,
+ () -> testDriver.getStateStore(versionedKeyValueStoreName));
+ assertThat(
+ e.getMessage(),
+ equalTo("Store " + versionedKeyValueStoreName
+ + " is a versioned key-value store and should be accessed
via `getVersionedKeyValueStore()`"));
+ }
{
final IllegalArgumentException e = assertThrows(
IllegalArgumentException.class,
@@ -1074,6 +1116,15 @@ public abstract class TopologyTestDriverTest {
equalTo("Store " + globalTimestampedKeyValueStoreName
+ " is a timestamped key-value store and should be
accessed via `getTimestampedKeyValueStore()`"));
}
+ if (persistent) { // versioned stores do not offer an in-memory
version yet, so nothing to test/verify unless persistent
+ final IllegalArgumentException e = assertThrows(
+ IllegalArgumentException.class,
+ () ->
testDriver.getStateStore(globalVersionedKeyValueStoreName));
+ assertThat(
+ e.getMessage(),
+ equalTo("Store " + globalVersionedKeyValueStoreName
+ + " is a versioned key-value store and should be accessed
via `getVersionedKeyValueStore()`"));
+ }
}
final ProcessorSupplier<byte[], byte[], Void, Void> voidProcessorSupplier
= () -> new Processor<byte[], byte[], Void, Void>() {
@@ -1086,11 +1137,13 @@ public abstract class TopologyTestDriverTest {
final boolean persistent,
final String keyValueStoreName,
final String timestampedKeyValueStoreName,
+ final String versionedKeyValueStoreName,
final String windowStoreName,
final String timestampedWindowStoreName,
final String sessionStoreName,
final String globalKeyValueStoreName,
- final String
globalTimestampedKeyValueStoreName) {
+ final String
globalTimestampedKeyValueStoreName,
+ final String
globalVersionedKeyValueStoreName) {
// add state stores
topology.addStateStore(
@@ -1111,6 +1164,15 @@ public abstract class TopologyTestDriverTest {
Serdes.ByteArray()
),
"processor");
+ if (persistent) { // versioned stores do not offer an in-memory
version yet
+ topology.addStateStore(
+ Stores.versionedKeyValueStoreBuilder(
+
Stores.persistentVersionedKeyValueStore(versionedKeyValueStoreName,
Duration.ofMillis(1000L)),
+ Serdes.ByteArray(),
+ Serdes.ByteArray()
+ ),
+ "processor");
+ }
topology.addStateStore(
Stores.windowStoreBuilder(
persistent ?
@@ -1177,6 +1239,20 @@ public abstract class TopologyTestDriverTest {
"topicDummy2",
"processorDummy2",
voidProcessorSupplier);
+ if (persistent) { // versioned stores do not offer an in-memory
version yet
+ topology.addGlobalStore(
+ Stores.versionedKeyValueStoreBuilder(
+
Stores.persistentVersionedKeyValueStore(globalVersionedKeyValueStoreName,
Duration.ofMillis(1000L)),
+ Serdes.ByteArray(),
+ Serdes.ByteArray()
+ ).withLoggingDisabled(),
+ "sourceDummy3",
+ Serdes.ByteArray().deserializer(),
+ Serdes.ByteArray().deserializer(),
+ "topicDummy3",
+ "processorDummy3",
+ voidProcessorSupplier);
+ }
}
@Test