This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 0b1a118 KAFKA-6813: Remove deprecated APIs in KIP-182, Part II (#4976)
0b1a118 is described below
commit 0b1a118f45418aba6af03e71e7169e38cb3ec9af
Author: Guozhang Wang <[email protected]>
AuthorDate: Wed May 9 17:13:05 2018 -0700
KAFKA-6813: Remove deprecated APIs in KIP-182, Part II (#4976)
1. Remove the deprecated StateStoreSuppliers, and the corresponding
Stores.create() functions and factories: only the base StateStoreSupplier and
MockStoreSupplier were still preserved as they are needed by the deprecated
TopologyBuilder and KStreamBuilder. Will remove them in a follow-up PR.
2. Add TopologyWrapper.java as the original InternalTopologyBuilderAccessor
was removed, but I realized it is still needed as of now.
3. Minor: removed StateStoreTestUtils.java and inline its logic in its
callers since now with StoreBuilder it is just a one-liner.
Reviewers: Bill Bejeck <[email protected]>, Matthias J. Sax
<[email protected]>
---
.../streams/kstream/internals/AbstractStream.java | 39 --
.../org/apache/kafka/streams/state/Stores.java | 414 ---------------------
.../internals/InMemoryKeyValueStoreSupplier.java | 53 ---
.../internals/InMemoryLRUCacheStoreSupplier.java | 51 ---
.../streams/state/internals/MemoryLRUCache.java | 3 -
.../internals/RocksDBKeyValueStoreSupplier.java | 1 -
.../internals/RocksDBSessionStoreSupplier.java | 65 ----
.../streams/state/internals/RocksDBStore.java | 2 -
.../internals/RocksDBWindowStoreSupplier.java | 75 ----
.../RocksDbSessionBytesStoreSupplier.java | 8 +-
.../internals/RocksDbWindowBytesStoreSupplier.java | 7 +-
.../kafka/streams/state/internals/ThreadCache.java | 2 -
.../org/apache/kafka/streams/TopologyWrapper.java | 34 ++
.../integration/RegexSourceIntegrationTest.java | 23 +-
.../kafka/streams/kstream/KStreamBuilderTest.java | 2 +-
...KStreamSessionWindowAggregateProcessorTest.java | 22 +-
.../streams/processor/TopologyBuilderTest.java | 28 +-
.../internals/InternalTopologyBuilderTest.java | 102 +++--
.../processor/internals/ProcessorTopologyTest.java | 169 +++++----
.../processor/internals/StandbyTaskTest.java | 1 -
.../internals/StreamsPartitionAssignorTest.java | 13 +-
.../org/apache/kafka/streams/state/StoresTest.java | 69 +---
.../CompositeReadOnlyKeyValueStoreTest.java | 17 +-
.../RocksDBKeyValueStoreSupplierTest.java | 162 --------
.../internals/RocksDBSessionStoreSupplierTest.java | 139 -------
.../internals/RocksDBWindowStoreSupplierTest.java | 175 ---------
.../state/internals/StateStoreTestUtils.java | 56 ---
.../StreamThreadStateStoreProviderTest.java | 29 +-
.../state/internals/WrappingStoreProviderTest.java | 12 +-
29 files changed, 249 insertions(+), 1524 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
index 3c65399..497bdac 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
@@ -16,8 +16,6 @@
*/
package org.apache.kafka.streams.kstream.internals;
-import org.apache.kafka.common.internals.Topic;
-import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueTransformer;
@@ -26,12 +24,7 @@ import
org.apache.kafka.streams.kstream.ValueTransformerSupplier;
import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.kstream.ValueMapperWithKey;
-import org.apache.kafka.streams.kstream.Window;
-import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.Stores;
-import org.apache.kafka.streams.state.WindowStore;
import java.util.HashSet;
import java.util.Objects;
@@ -81,38 +74,6 @@ public abstract class AbstractStream<K> {
};
}
- @SuppressWarnings({"unchecked", "deprecation"})
- static <T, K>
org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore>
keyValueStore(final Serde<K> keySerde,
- final
Serde<T> aggValueSerde,
- final
String storeName) {
- Objects.requireNonNull(storeName, "storeName can't be null");
- Topic.validate(storeName);
- return storeFactory(keySerde, aggValueSerde, storeName).build();
- }
-
- @SuppressWarnings({"unchecked", "deprecation"})
- static <W extends Window, T, K>
org.apache.kafka.streams.processor.StateStoreSupplier<WindowStore>
windowedStore(final Serde<K> keySerde,
-
final Serde<T> aggValSerde,
-
final Windows<W> windows,
-
final String storeName) {
- Objects.requireNonNull(storeName, "storeName can't be null");
- Topic.validate(storeName);
- return storeFactory(keySerde, aggValSerde, storeName)
- .windowed(windows.size(), windows.maintainMs(),
windows.segments, false)
- .build();
- }
-
- @SuppressWarnings("deprecation")
- static <T, K> Stores.PersistentKeyValueFactory<K, T> storeFactory(final
Serde<K> keySerde,
- final
Serde<T> aggValueSerde,
- final
String storeName) {
- return Stores.create(storeName)
- .withKeys(keySerde)
- .withValues(aggValueSerde)
- .persistent()
- .enableCaching();
- }
-
static <K, V, VR> ValueMapperWithKey<K, V, VR> withKey(final
ValueMapper<V, VR> valueMapper) {
Objects.requireNonNull(valueMapper, "valueMapper can't be null");
return new ValueMapperWithKey<K, V, VR>() {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index daa2915..27b985b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -22,13 +22,8 @@ import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
-import org.apache.kafka.streams.state.internals.InMemoryKeyValueStoreSupplier;
-import org.apache.kafka.streams.state.internals.InMemoryLRUCacheStoreSupplier;
import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
import org.apache.kafka.streams.state.internals.MemoryNavigableLRUCache;
-import org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreSupplier;
-import org.apache.kafka.streams.state.internals.RocksDBSessionStoreSupplier;
-import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier;
import
org.apache.kafka.streams.state.internals.RocksDbKeyValueBytesStoreSupplier;
import
org.apache.kafka.streams.state.internals.RocksDbSessionBytesStoreSupplier;
import
org.apache.kafka.streams.state.internals.RocksDbWindowBytesStoreSupplier;
@@ -37,9 +32,6 @@ import
org.apache.kafka.streams.state.internals.WindowStoreBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Map;
import java.util.Objects;
/**
@@ -244,411 +236,5 @@ public class Stores {
Objects.requireNonNull(supplier, "supplier cannot be null");
return new SessionStoreBuilder<>(supplier, keySerde, valueSerde,
Time.SYSTEM);
}
-
- /**
- * Begin to create a new {@link
org.apache.kafka.streams.processor.StateStoreSupplier} instance.
- *
- * @param name the name of the store
- * @return the factory that can be used to specify other options or
configurations for the store; never null
- * @deprecated use {@link #persistentKeyValueStore(String)}, {@link
#persistentWindowStore(String, long, int, long, boolean)}
- * {@link #persistentSessionStore(String, long)}, {@link #lruMap(String,
int)}, or {@link #inMemoryKeyValueStore(String)}
- */
- @Deprecated
- public static StoreFactory create(final String name) {
- return new StoreFactory() {
- @Override
- public <K> ValueFactory<K> withKeys(final Serde<K> keySerde) {
- return new ValueFactory<K>() {
- @Override
- public <V> KeyValueFactory<K, V> withValues(final Serde<V>
valueSerde) {
-
- return new KeyValueFactory<K, V>() {
-
- @Override
- public InMemoryKeyValueFactory<K, V> inMemory() {
- return new InMemoryKeyValueFactory<K, V>() {
- private int capacity = Integer.MAX_VALUE;
- private final Map<String, String>
logConfig = new HashMap<>();
- private boolean logged = true;
-
- /**
- * @param capacity the maximum capacity of
the in-memory cache; should be one less than a power of 2
- * @throws IllegalArgumentException if the
capacity of the store is zero or negative
- */
- @Override
- public InMemoryKeyValueFactory<K, V>
maxEntries(int capacity) {
- if (capacity < 1) throw new
IllegalArgumentException("The capacity must be positive");
- this.capacity = capacity;
- return this;
- }
-
- @Override
- public InMemoryKeyValueFactory<K, V>
enableLogging(final Map<String, String> config) {
- logged = true;
- logConfig.putAll(config);
- return this;
- }
-
- @Override
- public InMemoryKeyValueFactory<K, V>
disableLogging() {
- logged = false;
- logConfig.clear();
- return this;
- }
-
- @Override
- public
org.apache.kafka.streams.processor.StateStoreSupplier build() {
- log.trace("Defining InMemory Store
name={} capacity={} logged={}", name, capacity, logged);
- if (capacity < Integer.MAX_VALUE) {
- return new
InMemoryLRUCacheStoreSupplier<>(name, capacity, keySerde, valueSerde, logged,
logConfig);
- }
- return new
InMemoryKeyValueStoreSupplier<>(name, keySerde, valueSerde, logged, logConfig);
- }
- };
- }
-
- @Override
- public PersistentKeyValueFactory<K, V>
persistent() {
- return new PersistentKeyValueFactory<K, V>() {
- boolean cachingEnabled;
- private long windowSize;
- private final Map<String, String>
logConfig = new HashMap<>();
- private int numSegments = 0;
- private long retentionPeriod = 0L;
- private boolean retainDuplicates = false;
- private boolean sessionWindows;
- private boolean logged = true;
-
- @Override
- public PersistentKeyValueFactory<K, V>
windowed(final long windowSize, final long retentionPeriod, final int
numSegments, final boolean retainDuplicates) {
- if (numSegments <
RocksDBWindowStoreSupplier.MIN_SEGMENTS) {
- throw new
IllegalArgumentException("numSegments must be >= " +
RocksDBWindowStoreSupplier.MIN_SEGMENTS);
- }
- this.windowSize = windowSize;
- this.numSegments = numSegments;
- this.retentionPeriod = retentionPeriod;
- this.retainDuplicates =
retainDuplicates;
- this.sessionWindows = false;
-
- return this;
- }
-
- @Override
- public PersistentKeyValueFactory<K, V>
sessionWindowed(final long retentionPeriod) {
- this.sessionWindows = true;
- this.retentionPeriod = retentionPeriod;
- return this;
- }
-
- @Override
- public PersistentKeyValueFactory<K, V>
enableLogging(final Map<String, String> config) {
- logged = true;
- logConfig.putAll(config);
- return this;
- }
-
- @Override
- public PersistentKeyValueFactory<K, V>
disableLogging() {
- logged = false;
- logConfig.clear();
- return this;
- }
-
- @Override
- public PersistentKeyValueFactory<K, V>
enableCaching() {
- cachingEnabled = true;
- return this;
- }
-
- @Override
- public
org.apache.kafka.streams.processor.StateStoreSupplier build() {
- log.trace("Defining RocksDb Store
name={} numSegments={} logged={}", name, numSegments, logged);
- if (sessionWindows) {
- return new
RocksDBSessionStoreSupplier<>(name, retentionPeriod, keySerde, valueSerde,
logged, logConfig, cachingEnabled);
- } else if (numSegments > 0) {
- return new
RocksDBWindowStoreSupplier<>(name, retentionPeriod, numSegments,
retainDuplicates, keySerde, valueSerde, windowSize, logged, logConfig,
cachingEnabled);
- }
- return new
RocksDBKeyValueStoreSupplier<>(name, keySerde, valueSerde, logged, logConfig,
cachingEnabled);
- }
-
- };
- }
-
-
- };
- }
- };
- }
- };
- }
-
-
- public static abstract class StoreFactory {
- /**
- * Begin to create a {@link KeyValueStore} by specifying the keys will
be {@link String}s.
- *
- * @return the interface used to specify the type of values; never null
- */
- public ValueFactory<String> withStringKeys() {
- return withKeys(Serdes.String());
- }
-
- /**
- * Begin to create a {@link KeyValueStore} by specifying the keys will
be {@link Integer}s.
- *
- * @return the interface used to specify the type of values; never null
- */
- public ValueFactory<Integer> withIntegerKeys() {
- return withKeys(Serdes.Integer());
- }
-
- /**
- * Begin to create a {@link KeyValueStore} by specifying the keys will
be {@link Long}s.
- *
- * @return the interface used to specify the type of values; never null
- */
- public ValueFactory<Long> withLongKeys() {
- return withKeys(Serdes.Long());
- }
-
- /**
- * Begin to create a {@link KeyValueStore} by specifying the keys will
be {@link Double}s.
- *
- * @return the interface used to specify the type of values; never null
- */
- public ValueFactory<Double> withDoubleKeys() {
- return withKeys(Serdes.Double());
- }
-
- /**
- * Begin to create a {@link KeyValueStore} by specifying the keys will
be {@link ByteBuffer}.
- *
- * @return the interface used to specify the type of values; never null
- */
- public ValueFactory<ByteBuffer> withByteBufferKeys() {
- return withKeys(Serdes.ByteBuffer());
- }
-
- /**
- * Begin to create a {@link KeyValueStore} by specifying the keys will
be byte arrays.
- *
- * @return the interface used to specify the type of values; never null
- */
- public ValueFactory<byte[]> withByteArrayKeys() {
- return withKeys(Serdes.ByteArray());
- }
-
- /**
- * Begin to create a {@link KeyValueStore} by specifying the keys.
- *
- * @param keyClass the class for the keys, which must be one of the
types for which Kafka has built-in serdes
- * @return the interface used to specify the type of values; never null
- */
- public <K> ValueFactory<K> withKeys(Class<K> keyClass) {
- return withKeys(Serdes.serdeFrom(keyClass));
- }
-
- /**
- * Begin to create a {@link KeyValueStore} by specifying the
serializer and deserializer for the keys.
- *
- * @param keySerde the serialization factory for keys; may be null
- * @return the interface used to specify the type of values;
never null
- */
- public abstract <K> ValueFactory<K> withKeys(Serde<K> keySerde);
- }
-
- /**
- * The factory for creating off-heap key-value stores.
- *
- * @param <K> the type of keys
- */
- public static abstract class ValueFactory<K> {
- /**
- * Use {@link String} values.
- *
- * @return the interface used to specify the remaining key-value store
options; never null
- */
- public KeyValueFactory<K, String> withStringValues() {
- return withValues(Serdes.String());
- }
-
- /**
- * Use {@link Integer} values.
- *
- * @return the interface used to specify the remaining key-value store
options; never null
- */
- public KeyValueFactory<K, Integer> withIntegerValues() {
- return withValues(Serdes.Integer());
- }
-
- /**
- * Use {@link Long} values.
- *
- * @return the interface used to specify the remaining key-value store
options; never null
- */
- public KeyValueFactory<K, Long> withLongValues() {
- return withValues(Serdes.Long());
- }
-
- /**
- * Use {@link Double} values.
- *
- * @return the interface used to specify the remaining key-value store
options; never null
- */
- public KeyValueFactory<K, Double> withDoubleValues() {
- return withValues(Serdes.Double());
- }
-
- /**
- * Use {@link ByteBuffer} for values.
- *
- * @return the interface used to specify the remaining key-value store
options; never null
- */
- public KeyValueFactory<K, ByteBuffer> withByteBufferValues() {
- return withValues(Serdes.ByteBuffer());
- }
-
- /**
- * Use byte arrays for values.
- *
- * @return the interface used to specify the remaining key-value store
options; never null
- */
- public KeyValueFactory<K, byte[]> withByteArrayValues() {
- return withValues(Serdes.ByteArray());
- }
-
- /**
- * Use values of the specified type.
- *
- * @param valueClass the class for the values, which must be one of
the types for which Kafka has built-in serdes
- * @return the interface used to specify the remaining key-value store
options; never null
- */
- public <V> KeyValueFactory<K, V> withValues(Class<V> valueClass) {
- return withValues(Serdes.serdeFrom(valueClass));
- }
-
- /**
- * Use the specified serializer and deserializer for the values.
- *
- * @param valueSerde the serialization factory for values; may be
null
- * @return the interface used to specify the remaining
key-value store options; never null
- */
- public abstract <V> KeyValueFactory<K, V> withValues(Serde<V>
valueSerde);
- }
-
-
- public interface KeyValueFactory<K, V> {
- /**
- * Keep all key-value entries in-memory, although for durability all
entries are recorded in a Kafka topic that can be
- * read to restore the entries if they are lost.
- *
- * @return the factory to create in-memory key-value stores; never null
- */
- InMemoryKeyValueFactory<K, V> inMemory();
-
- /**
- * Keep all key-value entries off-heap in a local database, although
for durability all entries are recorded in a Kafka
- * topic that can be read to restore the entries if they are lost.
- *
- * @return the factory to create persistent key-value stores; never
null
- */
- PersistentKeyValueFactory<K, V> persistent();
- }
-
- /**
- * The interface used to create in-memory key-value stores.
- *
- * @param <K> the type of keys
- * @param <V> the type of values
- */
- @Deprecated
- public interface InMemoryKeyValueFactory<K, V> {
- /**
- * Limits the in-memory key-value store to hold a maximum number of
entries. The default is {@link Integer#MAX_VALUE}, which is
- * equivalent to not placing a limit on the number of entries.
- *
- * @param capacity the maximum capacity of the in-memory cache; should
be one less than a power of 2
- * @return this factory
- * @throws IllegalArgumentException if the capacity is not positive
- */
- InMemoryKeyValueFactory<K, V> maxEntries(int capacity);
-
- /**
- * Indicates that a changelog should be created for the store. The
changelog will be created
- * with the provided cleanupPolicy and configs.
- *
- * Note: Any unrecognized configs will be ignored.
- * @param config any configs that should be applied to the changelog
- * @return the factory to create an in-memory key-value store
- */
- InMemoryKeyValueFactory<K, V> enableLogging(final Map<String, String>
config);
-
- /**
- * Indicates that a changelog should not be created for the key-value
store
- * @return the factory to create an in-memory key-value store
- */
- InMemoryKeyValueFactory<K, V> disableLogging();
-
-
- /**
- * Return the instance of StateStoreSupplier of new key-value store.
- * @return the state store supplier; never null
- */
- org.apache.kafka.streams.processor.StateStoreSupplier build();
- }
-
- /**
- * The interface used to create off-heap key-value stores that use a local
database.
- *
- * @param <K> the type of keys
- * @param <V> the type of values
- */
- @Deprecated
- public interface PersistentKeyValueFactory<K, V> {
-
- /**
- * Set the persistent store as a windowed key-value store
- * @param windowSize size of the windows
- * @param retentionPeriod the maximum period of time in milli-second
to keep each window in this store
- * @param numSegments the maximum number of segments for rolling the
windowed store
- * @param retainDuplicates whether or not to retain duplicate data
within the window
- */
- PersistentKeyValueFactory<K, V> windowed(final long windowSize, long
retentionPeriod, int numSegments, boolean retainDuplicates);
-
- /**
- * Set the persistent store as a {@link SessionStore} for use with
{@link org.apache.kafka.streams.kstream.SessionWindows}
- * @param retentionPeriod period of time in milliseconds to keep each
window in this store
- */
- PersistentKeyValueFactory<K, V> sessionWindowed(final long
retentionPeriod);
-
- /**
- * Indicates that a changelog should be created for the store. The
changelog will be created
- * with the provided cleanupPolicy and configs.
- *
- * Note: Any unrecognized configs will be ignored.
- * @param config any configs that should be applied to the
changelog
- * @return the factory to create a persistent key-value store
- */
- PersistentKeyValueFactory<K, V> enableLogging(final Map<String,
String> config);
-
- /**
- * Indicates that a changelog should not be created for the key-value
store
- * @return the factory to create a persistent key-value store
- */
- PersistentKeyValueFactory<K, V> disableLogging();
-
- /**
- * Caching should be enabled on the created store.
- * @return the factory to create a persistent key-value store
- */
- PersistentKeyValueFactory<K, V> enableCaching();
-
- /**
- * Return the instance of StateStoreSupplier of new key-value store.
- * @return the key-value store; never null
- */
- org.apache.kafka.streams.processor.StateStoreSupplier build();
-
- }
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
deleted file mode 100644
index f955421..0000000
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state.internals;
-
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.state.KeyValueStore;
-
-import java.util.Map;
-
-/**
- * An in-memory key-value store based on a TreeMap.
- *
- * Note that the use of array-typed keys is discouraged because they result in
incorrect ordering behavior.
- * If you intend to work on byte arrays as key, for example, you may want to
wrap them with the {@code Bytes} class,
- * i.e. use {@code RocksDBStore<Bytes, ...>} rather than {@code
RocksDBStore<byte[], ...>}.
- *
- * @param <K> The key type
- * @param <V> The value type
- *
- * @see org.apache.kafka.streams.state.Stores#create(String)
- */
-@Deprecated
-public class InMemoryKeyValueStoreSupplier<K, V> extends
AbstractStoreSupplier<K, V, KeyValueStore> {
-
- public InMemoryKeyValueStoreSupplier(String name, Serde<K> keySerde,
Serde<V> valueSerde, boolean logged, Map<String, String> logConfig) {
- this(name, keySerde, valueSerde, null, logged, logConfig);
- }
-
- public InMemoryKeyValueStoreSupplier(String name, Serde<K> keySerde,
Serde<V> valueSerde, Time time, boolean logged, Map<String, String> logConfig) {
- super(name, keySerde, valueSerde, time, logged, logConfig);
- }
-
- public KeyValueStore get() {
- InMemoryKeyValueStore<K, V> store = new InMemoryKeyValueStore<>(name,
keySerde, valueSerde);
-
- return new MeteredKeyValueStore<>(logged ? store.enableLogging() :
store, "in-memory-state", time);
- }
-}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java
deleted file mode 100644
index 0f897ba..0000000
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state.internals;
-
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.state.KeyValueStore;
-
-import java.util.Map;
-
-/**
- * An in-memory key-value store that is limited in size and retains a maximum
number of most recently used entries.
- *
- * @param <K> The key type
- * @param <V> The value type
- *
- */
-@Deprecated
-public class InMemoryLRUCacheStoreSupplier<K, V> extends
AbstractStoreSupplier<K, V, KeyValueStore> {
-
- private final int capacity;
-
- public InMemoryLRUCacheStoreSupplier(String name, int capacity, Serde<K>
keySerde, Serde<V> valueSerde, boolean logged, Map<String, String> logConfig) {
- this(name, capacity, keySerde, valueSerde, null, logged, logConfig);
- }
-
- private InMemoryLRUCacheStoreSupplier(String name, int capacity, Serde<K>
keySerde, Serde<V> valueSerde, Time time, boolean logged, Map<String, String>
logConfig) {
- super(name, keySerde, valueSerde, time, logged, logConfig);
- this.capacity = capacity;
- }
-
- public KeyValueStore get() {
- MemoryNavigableLRUCache<K, V> cache = new
MemoryNavigableLRUCache<>(name, capacity, keySerde, valueSerde);
- return new MeteredKeyValueStore<>(logged ? cache.enableLogging() :
cache, "in-memory-lru-state", time);
- }
-
-}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
index b99c907..1957aa4 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
@@ -37,12 +37,9 @@ import java.util.Objects;
* * Note that the use of array-typed keys is discouraged because they result
in incorrect ordering behavior.
* If you intend to work on byte arrays as key, for example, you may want to
wrap them with the {@code Bytes} class,
* i.e. use {@code RocksDBStore<Bytes, ...>} rather than {@code
RocksDBStore<byte[], ...>}.
-
*
* @param <K> The key type
* @param <V> The value type
- *
- * @see org.apache.kafka.streams.state.Stores#create(String)
*/
public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
index 4b233f0..3bc56c2 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
@@ -27,7 +27,6 @@ import java.util.Map;
*
* @param <K> the type of keys
* @param <V> the type of values
- * @see org.apache.kafka.streams.state.Stores#create(String)
*/
@Deprecated
public class RocksDBKeyValueStoreSupplier<K, V> extends
AbstractStoreSupplier<K, V, KeyValueStore> {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplier.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplier.java
deleted file mode 100644
index 1552f7d..0000000
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplier.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state.internals;
-
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.state.SessionStore;
-
-import java.util.Map;
-
-/**
- * A {@link org.apache.kafka.streams.state.KeyValueStore} that stores all
entries in a local RocksDB database.
- *
- * @param <K> the type of keys
- * @param <V> the type of values
- *
- * @see org.apache.kafka.streams.state.Stores#create(String)
- */
-@Deprecated
-public class RocksDBSessionStoreSupplier<K, V> extends
AbstractStoreSupplier<K, V, SessionStore> implements
WindowStoreSupplier<SessionStore> {
-
- static final int NUM_SEGMENTS = 3;
- private final long retentionPeriod;
- private final SessionStoreBuilder<K, V> builder;
-
- public RocksDBSessionStoreSupplier(String name, long retentionPeriod,
Serde<K> keySerde, Serde<V> valueSerde, boolean logged, Map<String, String>
logConfig, boolean cached) {
- super(name, keySerde, valueSerde, Time.SYSTEM, logged, logConfig);
- this.retentionPeriod = retentionPeriod;
- builder = new SessionStoreBuilder<>(new
RocksDbSessionBytesStoreSupplier(name,
-
retentionPeriod),
- keySerde,
- valueSerde,
- time);
- if (cached) {
- builder.withCachingEnabled();
- }
- // logged by default so we only need to worry about when it is
disabled.
- if (!logged) {
- builder.withLoggingDisabled();
- }
- }
-
- public SessionStore<K, V> get() {
- return builder.build();
-
- }
-
- public long retentionPeriod() {
- return retentionPeriod;
- }
-}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index 2813041..d2b8cd2 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -62,8 +62,6 @@ import java.util.Set;
* Note that the use of array-typed keys is discouraged because they result in
incorrect caching behavior.
* If you intend to work on byte arrays as key, for example, you may want to
wrap them with the {@code Bytes} class,
* i.e. use {@code RocksDBStore<Bytes, ...>} rather than {@code
RocksDBStore<byte[], ...>}.
- *
- * @see org.apache.kafka.streams.state.Stores#create(String)
*/
public class RocksDBStore implements KeyValueStore<Bytes, byte[]> {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
deleted file mode 100644
index 2a82f79..0000000
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state.internals;
-
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.state.WindowStore;
-
-import java.util.Map;
-
-/**
- * A {@link org.apache.kafka.streams.state.KeyValueStore} that stores all
entries in a local RocksDB database.
- *
- * @param <K> the type of keys
- * @param <V> the type of values
- *
- * @see org.apache.kafka.streams.state.Stores#create(String)
- */
-@Deprecated
-public class RocksDBWindowStoreSupplier<K, V> extends AbstractStoreSupplier<K,
V, WindowStore> implements WindowStoreSupplier<WindowStore> {
- public static final int MIN_SEGMENTS = 2;
- private final long retentionPeriod;
- private WindowStoreBuilder<K, V> builder;
-
- public RocksDBWindowStoreSupplier(String name, long retentionPeriod, int
numSegments, boolean retainDuplicates, Serde<K> keySerde, Serde<V> valueSerde,
long windowSize, boolean logged, Map<String, String> logConfig, boolean
enableCaching) {
- this(name, retentionPeriod, numSegments, retainDuplicates, keySerde,
valueSerde, Time.SYSTEM, windowSize, logged, logConfig, enableCaching);
- }
-
- public RocksDBWindowStoreSupplier(String name, long retentionPeriod, int
numSegments, boolean retainDuplicates, Serde<K> keySerde, Serde<V> valueSerde,
Time time, long windowSize, boolean logged, Map<String, String> logConfig,
boolean enableCaching) {
- super(name, keySerde, valueSerde, time, logged, logConfig);
- if (numSegments < MIN_SEGMENTS) {
- throw new IllegalArgumentException("numSegments must be >= " +
MIN_SEGMENTS);
- }
- this.retentionPeriod = retentionPeriod;
- builder = new WindowStoreBuilder<>(new
RocksDbWindowBytesStoreSupplier(name,
-
retentionPeriod,
-
numSegments,
-
windowSize,
-
retainDuplicates),
- keySerde,
- valueSerde,
- time);
- if (enableCaching) {
- builder.withCachingEnabled();
- }
- // logged by default so we only need to worry about when it is
disabled.
- if (!logged) {
- builder.withLoggingDisabled();
- }
- }
-
- public WindowStore<K, V> get() {
- return builder.build();
- }
-
- @Override
- public long retentionPeriod() {
- return retentionPeriod;
- }
-
-}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java
index b9b7181..5a87bc5 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java
@@ -25,6 +25,8 @@ public class RocksDbSessionBytesStoreSupplier implements
SessionBytesStoreSuppli
private final String name;
private final long retentionPeriod;
+ private static final int NUM_SEGMENTS = 3;
+
public RocksDbSessionBytesStoreSupplier(final String name,
final long retentionPeriod) {
this.name = name;
@@ -36,13 +38,12 @@ public class RocksDbSessionBytesStoreSupplier implements
SessionBytesStoreSuppli
return name;
}
- @SuppressWarnings("deprecation")
@Override
public SessionStore<Bytes, byte[]> get() {
final RocksDBSegmentedBytesStore segmented = new
RocksDBSegmentedBytesStore(
name,
retentionPeriod,
-
org.apache.kafka.streams.state.internals.RocksDBSessionStoreSupplier.NUM_SEGMENTS,
+ NUM_SEGMENTS,
new SessionKeySchema());
return new RocksDBSessionStore<>(segmented, Serdes.Bytes(),
Serdes.ByteArray());
}
@@ -52,11 +53,10 @@ public class RocksDbSessionBytesStoreSupplier implements
SessionBytesStoreSuppli
return "rocksdb-session";
}
- @SuppressWarnings("deprecation")
@Override
public long segmentIntervalMs() {
return Segments.segmentInterval(
retentionPeriod,
-
org.apache.kafka.streams.state.internals.RocksDBSessionStoreSupplier.NUM_SEGMENTS);
+ NUM_SEGMENTS);
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
index e1521f8..5fbf491 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
@@ -28,14 +28,15 @@ public class RocksDbWindowBytesStoreSupplier implements
WindowBytesStoreSupplier
private final long windowSize;
private final boolean retainDuplicates;
- @SuppressWarnings("deprecation")
+ private static final int MIN_SEGMENTS = 2;
+
public RocksDbWindowBytesStoreSupplier(final String name,
final long retentionPeriod,
final int segments,
final long windowSize,
final boolean retainDuplicates) {
- if (segments <
org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier.MIN_SEGMENTS)
{
- throw new IllegalArgumentException("numSegments must be >= " +
org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier.MIN_SEGMENTS);
+ if (segments < MIN_SEGMENTS) {
+ throw new IllegalArgumentException("numSegments must be >= " +
MIN_SEGMENTS);
}
this.name = name;
this.retentionPeriod = retentionPeriod;
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
index b947664..8c3716b 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
@@ -33,8 +33,6 @@ import java.util.NoSuchElementException;
/**
* An in-memory LRU cache store similar to {@link MemoryLRUCache} but
byte-based, not
* record based
- *
- * @see org.apache.kafka.streams.state.Stores#create(String)
*/
public class ThreadCache {
private final Logger log;
diff --git
a/streams/src/test/java/org/apache/kafka/streams/TopologyWrapper.java
b/streams/src/test/java/org/apache/kafka/streams/TopologyWrapper.java
new file mode 100644
index 0000000..f106766
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyWrapper.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
+
+/**
+ * This class allows to access the {@link InternalTopologyBuilder} a {@link
Topology} object.
+ *
+ */
+public class TopologyWrapper extends Topology {
+
+ public InternalTopologyBuilder getInternalBuilder() {
+ return internalTopologyBuilder;
+ }
+
+ public void setApplicationId(String applicationId) {
+ internalTopologyBuilder.setApplicationId(applicationId);
+ }
+}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
index d0361dc..e5160e1 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
@@ -29,17 +29,17 @@ import
org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TopologyWrapper;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.processor.ProcessorSupplier;
-import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.MockProcessorSupplier;
-import org.apache.kafka.test.MockStateStoreSupplier;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
@@ -136,8 +136,6 @@ public class RegexSourceIntegrationTest {
final List<String> expectedFirstAssignment =
Arrays.asList("TEST-TOPIC-1");
final List<String> expectedSecondAssignment =
Arrays.asList("TEST-TOPIC-1", "TEST-TOPIC-2");
- final StreamsConfig streamsConfig = new
StreamsConfig(streamsConfiguration);
-
CLUSTER.createTopic("TEST-TOPIC-1");
final StreamsBuilder builder = new StreamsBuilder();
@@ -227,28 +225,27 @@ public class RegexSourceIntegrationTest {
}, STREAM_TASKS_NOT_UPDATED);
}
- @SuppressWarnings("deprecation")
@Test
public void shouldAddStateStoreToRegexDefinedSource() throws
InterruptedException {
final ProcessorSupplier<String, String> processorSupplier = new
MockProcessorSupplier<>();
- final MockStateStoreSupplier stateStoreSupplier = new
MockStateStoreSupplier("testStateStore", false);
+ final StoreBuilder storeBuilder =
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("testStateStore"),
Serdes.String(), Serdes.String());
final long thirtySecondTimeout = 30 * 1000;
- final TopologyBuilder builder = new TopologyBuilder()
- .addSource("ingest", Pattern.compile("topic-\\d+"))
- .addProcessor("my-processor", processorSupplier, "ingest")
- .addStateStore(stateStoreSupplier, "my-processor");
+ final TopologyWrapper topology = new TopologyWrapper();
+ topology.addSource("ingest", Pattern.compile("topic-\\d+"));
+ topology.addProcessor("my-processor", processorSupplier, "ingest");
+ topology.addStateStore(storeBuilder, "my-processor");
+ streams = new KafkaStreams(topology, streamsConfiguration);
- streams = new KafkaStreams(builder, streamsConfiguration);
try {
streams.start();
final TestCondition stateStoreNameBoundToSourceTopic = new
TestCondition() {
@Override
public boolean conditionMet() {
- final Map<String, List<String>> stateStoreToSourceTopic =
builder.stateStoreNameToSourceTopics();
+ final Map<String, List<String>> stateStoreToSourceTopic =
topology.getInternalBuilder().stateStoreNameToSourceTopics();
final List<String> topicNamesList =
stateStoreToSourceTopic.get("testStateStore");
return topicNamesList != null && !topicNamesList.isEmpty()
&& topicNamesList.get(0).equals("topic-1");
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
index 255c3eb..27f0833 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
@@ -18,9 +18,9 @@ package org.apache.kafka.streams.kstream;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TopologyTestDriverWrapper;
import org.apache.kafka.streams.errors.TopologyBuilderException;
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
index 8cb2eae..afc9be1 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
@@ -31,7 +31,8 @@ import
org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import
org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.SessionStore;
-import org.apache.kafka.streams.state.internals.RocksDBSessionStoreSupplier;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.NoOpRecordCollector;
@@ -43,7 +44,6 @@ import org.junit.Test;
import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.List;
import static org.apache.kafka.test.StreamsTestUtils.getMetricByName;
@@ -106,16 +106,16 @@ public class KStreamSessionWindowAggregateProcessorTest {
}
private void initStore(final boolean enableCaching) {
- final RocksDBSessionStoreSupplier<String, Long> supplier =
- new RocksDBSessionStoreSupplier<>(
- STORE_NAME,
- GAP_MS * 3,
+ final StoreBuilder<SessionStore<String, Long>> storeBuilder =
Stores.sessionStoreBuilder(Stores.persistentSessionStore(STORE_NAME, GAP_MS *
3),
Serdes.String(),
- Serdes.Long(),
- false,
- Collections.<String, String>emptyMap(),
- enableCaching);
- sessionStore = supplier.get();
+ Serdes.Long())
+ .withLoggingDisabled();
+
+ if (enableCaching) {
+ storeBuilder.withCachingEnabled();
+ }
+
+ sessionStore = storeBuilder.build();
sessionStore.init(context, sessionStore);
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
index d1d25e9..93b233b 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
@@ -31,8 +31,6 @@ import
org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor;
import
org.apache.kafka.streams.processor.internals.UnwindowedChangelogTopicConfig;
-import org.apache.kafka.streams.state.Stores;
-import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockStateStoreSupplier;
import org.apache.kafka.test.MockTimestampExtractor;
@@ -547,23 +545,6 @@ public class TopologyBuilderTest {
assertEquals(Collections.singletonList("appId-internal-topic"),
stateStoreNameToSourceTopic.get("store"));
}
- @SuppressWarnings("unchecked")
- @Test
- public void shouldAddInternalTopicConfigForWindowStores() {
- final TopologyBuilder builder = new TopologyBuilder();
- builder.setApplicationId("appId");
- builder.addSource("source", "topic");
- builder.addProcessor("processor", new MockProcessorSupplier(),
"source");
- builder.addStateStore(new RocksDBWindowStoreSupplier("store", 30000,
3, false, null, null, 10000, true, Collections.<String, String>emptyMap(),
false), "processor");
- final Map<Integer, TopicsInfo> topicGroups = builder.topicGroups();
- final TopicsInfo topicsInfo = topicGroups.values().iterator().next();
- final InternalTopicConfig topicConfig =
topicsInfo.stateChangelogTopics.get("appId-store-changelog");
- final Map<String, String> properties =
topicConfig.getProperties(Collections.<String, String>emptyMap(), 10000);
- assertEquals(2, properties.size());
- assertEquals("40000", properties.get(TopicConfig.RETENTION_MS_CONFIG));
- assertEquals("appId-store-changelog", topicConfig.name());
- }
-
@Test
public void shouldAddInternalTopicConfigForNonWindowStores() {
final TopologyBuilder builder = new TopologyBuilder();
@@ -594,7 +575,7 @@ public class TopologyBuilderTest {
}
@Test
- public void shouldThroughOnUnassignedStateStoreAccess() throws Exception {
+ public void shouldThrowOnUnassignedStateStoreAccess() {
final String sourceNodeName = "source";
final String goodNodeName = "goodGuy";
final String badNodeName = "badGuy";
@@ -603,12 +584,11 @@ public class TopologyBuilderTest {
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "host:1");
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
config.put(StreamsConfig.STATE_DIR_CONFIG,
TestUtils.tempDirectory().getAbsolutePath());
- final StreamsConfig streamsConfig = new StreamsConfig(config);
final TopologyBuilder builder = new TopologyBuilder();
builder.addSource(sourceNodeName, "topic")
.addProcessor(goodNodeName, new LocalMockProcessorSupplier(),
sourceNodeName)
-
.addStateStore(Stores.create(LocalMockProcessorSupplier.STORE_NAME).withStringKeys().withStringValues().inMemory().build(),
goodNodeName)
+ .addStateStore(new
MockStateStoreSupplier(LocalMockProcessorSupplier.STORE_NAME, false),
goodNodeName)
.addProcessor(badNodeName, new LocalMockProcessorSupplier(),
sourceNodeName);
try {
final TopologyTestDriverWrapper driver = new
TopologyTestDriverWrapper(builder.internalTopologyBuilder, config);
@@ -724,6 +704,7 @@ public class TopologyBuilderTest {
assertThat(processorTopology.source(pattern.pattern()).getTimestampExtractor(),
instanceOf(MockTimestampExtractor.class));
}
+ @SuppressWarnings("unchecked")
@Test
public void shouldConnectRegexMatchedTopicsToStateStore() throws Exception
{
@@ -755,10 +736,11 @@ public class TopologyBuilderTest {
assertFalse(topics.contains("topic-A"));
}
+ @SuppressWarnings("unchecked")
@Test(expected = TopologyBuilderException.class)
public void
shouldNotAllowToAddGlobalStoreWithSourceNameEqualsProcessorName() {
final String sameNameForSourceAndProcessor = "sameName";
- final TopologyBuilder topologyBuilder = new TopologyBuilder()
+ new TopologyBuilder()
.addGlobalStore(new MockStateStoreSupplier("anyName", false,
false),
sameNameForSourceAndProcessor,
null,
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
index 149a158..c73593e 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
@@ -29,11 +29,10 @@ import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
-import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier;
import org.apache.kafka.test.MockProcessorSupplier;
-import org.apache.kafka.test.MockStateStoreSupplier;
import org.apache.kafka.test.MockTimestampExtractor;
import org.apache.kafka.test.TestUtils;
import org.junit.Test;
@@ -63,8 +62,9 @@ import static org.junit.Assert.fail;
public class InternalTopologyBuilderTest {
- private final InternalTopologyBuilder builder = new
InternalTopologyBuilder();
private final Serde<String> stringSerde = Serdes.String();
+ private final InternalTopologyBuilder builder = new
InternalTopologyBuilder();
+ private final StoreBuilder storeBuilder =
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("store"),
Serdes.ByteArray(), Serdes.ByteArray());
@Test
public void shouldAddSourceWithOffsetReset() {
@@ -266,14 +266,14 @@ public class InternalTopologyBuilderTest {
@Test(expected = TopologyException.class)
public void testAddStateStoreWithNonExistingProcessor() {
- builder.addStateStore(new MockStateStoreSupplier("store", false),
"no-such-processsor");
+ builder.addStateStore(storeBuilder, "no-such-processsor");
}
@Test
public void testAddStateStoreWithSource() {
builder.addSource(null, "source-1", null, null, null, "topic-1");
try {
- builder.addStateStore(new MockStateStoreSupplier("store", false),
"source-1");
+ builder.addStateStore(storeBuilder, "source-1");
fail("Should throw TopologyException with store cannot be added to
source");
} catch (final TopologyException expected) { /* ok */ }
}
@@ -282,36 +282,34 @@ public class InternalTopologyBuilderTest {
public void testAddStateStoreWithSink() {
builder.addSink("sink-1", "topic-1", null, null, null);
try {
- builder.addStateStore(new MockStateStoreSupplier("store", false),
"sink-1");
+ builder.addStateStore(storeBuilder, "sink-1");
fail("Should throw TopologyException with store cannot be added to
sink");
} catch (final TopologyException expected) { /* ok */ }
}
@Test
public void testAddStateStoreWithDuplicates() {
- builder.addStateStore(new MockStateStoreSupplier("store", false));
+ builder.addStateStore(storeBuilder);
try {
- builder.addStateStore(new MockStateStoreSupplier("store", false));
+ builder.addStateStore(storeBuilder);
fail("Should throw TopologyException with store name conflict");
} catch (final TopologyException expected) { /* ok */ }
}
- @SuppressWarnings("deprecation")
@Test
public void testAddStateStore() {
- final StateStoreSupplier supplier = new
MockStateStoreSupplier("store-1", false);
- builder.addStateStore(supplier);
+ builder.addStateStore(storeBuilder);
builder.setApplicationId("X");
builder.addSource(null, "source-1", null, null, null, "topic-1");
builder.addProcessor("processor-1", new MockProcessorSupplier(),
"source-1");
assertEquals(0, builder.build(null).stateStores().size());
- builder.connectProcessorAndStateStores("processor-1", "store-1");
+ builder.connectProcessorAndStateStores("processor-1",
storeBuilder.name());
final List<StateStore> suppliers = builder.build(null).stateStores();
assertEquals(1, suppliers.size());
- assertEquals(supplier.name(), suppliers.get(0).name());
+ assertEquals(storeBuilder.name(), suppliers.get(0).name());
}
@Test
@@ -346,7 +344,6 @@ public class InternalTopologyBuilderTest {
assertEquals(mkSet(mkSet("topic-1", "X-topic-1x", "topic-2")), new
HashSet<>(copartitionGroups));
}
- @SuppressWarnings("deprecation")
@Test
public void testTopicGroupsByStateStore() {
builder.setApplicationId("X");
@@ -358,15 +355,14 @@ public class InternalTopologyBuilderTest {
builder.addProcessor("processor-1", new MockProcessorSupplier(),
"source-1");
builder.addProcessor("processor-2", new MockProcessorSupplier(),
"source-2");
- builder.addStateStore(new MockStateStoreSupplier("store-1", false),
"processor-1", "processor-2");
+
builder.addStateStore(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("store-1"),
Serdes.ByteArray(), Serdes.ByteArray()), "processor-1", "processor-2");
builder.addProcessor("processor-3", new MockProcessorSupplier(),
"source-3");
builder.addProcessor("processor-4", new MockProcessorSupplier(),
"source-4");
- builder.addStateStore(new MockStateStoreSupplier("store-2", false),
"processor-3", "processor-4");
+
builder.addStateStore(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("store-2"),
Serdes.ByteArray(), Serdes.ByteArray()), "processor-3", "processor-4");
builder.addProcessor("processor-5", new MockProcessorSupplier(),
"source-5");
- final StateStoreSupplier supplier = new
MockStateStoreSupplier("store-3", false);
- builder.addStateStore(supplier);
+
builder.addStateStore(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("store-3"),
Serdes.ByteArray(), Serdes.ByteArray()));
builder.connectProcessorAndStateStores("processor-5", "store-3");
final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups =
builder.topicGroups();
@@ -415,17 +411,17 @@ public class InternalTopologyBuilderTest {
}
@Test(expected = NullPointerException.class)
- public void shouldNotAllowNullNameWhenAddingSink() throws Exception {
+ public void shouldNotAllowNullNameWhenAddingSink() {
builder.addSink(null, "topic", null, null, null);
}
@Test(expected = NullPointerException.class)
- public void shouldNotAllowNullTopicWhenAddingSink() throws Exception {
+ public void shouldNotAllowNullTopicWhenAddingSink() {
builder.addSink("name", null, null, null, null);
}
@Test(expected = NullPointerException.class)
- public void shouldNotAllowNullNameWhenAddingProcessor() throws Exception {
+ public void shouldNotAllowNullNameWhenAddingProcessor() {
builder.addProcessor(null, new ProcessorSupplier() {
@Override
public Processor get() {
@@ -435,39 +431,38 @@ public class InternalTopologyBuilderTest {
}
@Test(expected = NullPointerException.class)
- public void shouldNotAllowNullProcessorSupplier() throws Exception {
+ public void shouldNotAllowNullProcessorSupplier() {
builder.addProcessor("name", null);
}
@Test(expected = NullPointerException.class)
- public void shouldNotAllowNullNameWhenAddingSource() throws Exception {
+ public void shouldNotAllowNullNameWhenAddingSource() {
builder.addSource(null, null, null, null, null, Pattern.compile(".*"));
}
@Test(expected = NullPointerException.class)
- public void
shouldNotAllowNullProcessorNameWhenConnectingProcessorAndStateStores() throws
Exception {
+ public void
shouldNotAllowNullProcessorNameWhenConnectingProcessorAndStateStores() {
builder.connectProcessorAndStateStores(null, "store");
}
@Test(expected = NullPointerException.class)
- public void
shouldNotAllowNullStateStoreNameWhenConnectingProcessorAndStateStores() throws
Exception {
+ public void
shouldNotAllowNullStateStoreNameWhenConnectingProcessorAndStateStores() {
builder.connectProcessorAndStateStores("processor", new
String[]{null});
}
@Test(expected = NullPointerException.class)
- public void shouldNotAddNullInternalTopic() throws Exception {
+ public void shouldNotAddNullInternalTopic() {
builder.addInternalTopic(null);
}
@Test(expected = NullPointerException.class)
- public void shouldNotSetApplicationIdToNull() throws Exception {
+ public void shouldNotSetApplicationIdToNull() {
builder.setApplicationId(null);
}
- @SuppressWarnings("deprecation")
@Test(expected = NullPointerException.class)
- public void shouldNotAddNullStateStoreSupplier() throws Exception {
- builder.addStateStore((StateStoreSupplier) null);
+ public void shouldNotAddNullStateStoreSupplier() {
+ builder.addStateStore((StoreBuilder) null);
}
private Set<String> nodeNames(final Collection<ProcessorNode> nodes) {
@@ -479,44 +474,43 @@ public class InternalTopologyBuilderTest {
}
@Test
- public void
shouldAssociateStateStoreNameWhenStateStoreSupplierIsInternal() throws
Exception {
+ public void
shouldAssociateStateStoreNameWhenStateStoreSupplierIsInternal() {
builder.addSource(null, "source", null, null, null, "topic");
builder.addProcessor("processor", new MockProcessorSupplier(),
"source");
- builder.addStateStore(new MockStateStoreSupplier("store", false),
"processor");
+ builder.addStateStore(storeBuilder, "processor");
final Map<String, List<String>> stateStoreNameToSourceTopic =
builder.stateStoreNameToSourceTopics();
assertEquals(1, stateStoreNameToSourceTopic.size());
assertEquals(Collections.singletonList("topic"),
stateStoreNameToSourceTopic.get("store"));
}
@Test
- public void
shouldAssociateStateStoreNameWhenStateStoreSupplierIsExternal() throws
Exception {
+ public void
shouldAssociateStateStoreNameWhenStateStoreSupplierIsExternal() {
builder.addSource(null, "source", null, null, null, "topic");
builder.addProcessor("processor", new MockProcessorSupplier(),
"source");
- builder.addStateStore(new MockStateStoreSupplier("store", false),
"processor");
+ builder.addStateStore(storeBuilder, "processor");
final Map<String, List<String>> stateStoreNameToSourceTopic =
builder.stateStoreNameToSourceTopics();
assertEquals(1, stateStoreNameToSourceTopic.size());
assertEquals(Collections.singletonList("topic"),
stateStoreNameToSourceTopic.get("store"));
}
@Test
- public void shouldCorrectlyMapStateStoreToInternalTopics() throws
Exception {
+ public void shouldCorrectlyMapStateStoreToInternalTopics() {
builder.setApplicationId("appId");
builder.addInternalTopic("internal-topic");
builder.addSource(null, "source", null, null, null, "internal-topic");
builder.addProcessor("processor", new MockProcessorSupplier(),
"source");
- builder.addStateStore(new MockStateStoreSupplier("store", false),
"processor");
+ builder.addStateStore(storeBuilder, "processor");
final Map<String, List<String>> stateStoreNameToSourceTopic =
builder.stateStoreNameToSourceTopics();
assertEquals(1, stateStoreNameToSourceTopic.size());
assertEquals(Collections.singletonList("appId-internal-topic"),
stateStoreNameToSourceTopic.get("store"));
}
- @SuppressWarnings("unchecked")
@Test
- public void shouldAddInternalTopicConfigForWindowStores() throws Exception
{
+ public void shouldAddInternalTopicConfigForWindowStores() {
builder.setApplicationId("appId");
builder.addSource(null, "source", null, null, null, "topic");
builder.addProcessor("processor", new MockProcessorSupplier(),
"source");
- builder.addStateStore(new RocksDBWindowStoreSupplier("store", 30000,
3, false, null, null, 10000, true, Collections.<String, String>emptyMap(),
false), "processor");
+
builder.addStateStore(Stores.windowStoreBuilder(Stores.persistentWindowStore("store",
30000, 3, 10000, false), Serdes.String(), Serdes.String()), "processor");
final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups =
builder.topicGroups();
final InternalTopologyBuilder.TopicsInfo topicsInfo =
topicGroups.values().iterator().next();
final InternalTopicConfig topicConfig =
topicsInfo.stateChangelogTopics.get("appId-store-changelog");
@@ -528,13 +522,12 @@ public class InternalTopologyBuilderTest {
assertTrue(topicConfig instanceof WindowedChangelogTopicConfig);
}
- @SuppressWarnings("unchecked")
@Test
- public void shouldAddInternalTopicConfigForNonWindowStores() throws
Exception {
+ public void shouldAddInternalTopicConfigForNonWindowStores() {
builder.setApplicationId("appId");
builder.addSource(null, "source", null, null, null, "topic");
builder.addProcessor("processor", new MockProcessorSupplier(),
"source");
- builder.addStateStore(new MockStateStoreSupplier("store", true),
"processor");
+ builder.addStateStore(storeBuilder, "processor");
final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups =
builder.topicGroups();
final InternalTopologyBuilder.TopicsInfo topicsInfo =
topicGroups.values().iterator().next();
final InternalTopicConfig topicConfig =
topicsInfo.stateChangelogTopics.get("appId-store-changelog");
@@ -545,9 +538,8 @@ public class InternalTopologyBuilderTest {
assertTrue(topicConfig instanceof UnwindowedChangelogTopicConfig);
}
- @SuppressWarnings("unchecked")
@Test
- public void shouldAddInternalTopicConfigForRepartitionTopics() throws
Exception {
+ public void shouldAddInternalTopicConfigForRepartitionTopics() {
builder.setApplicationId("appId");
builder.addInternalTopic("foo");
builder.addSource(null, "source", null, null, null, "foo");
@@ -561,7 +553,6 @@ public class InternalTopologyBuilderTest {
assertTrue(topicConfig instanceof RepartitionTopicConfig);
}
- @SuppressWarnings("deprecation")
@Test
public void shouldThrowOnUnassignedStateStoreAccess() {
final String sourceNodeName = "source";
@@ -572,12 +563,11 @@ public class InternalTopologyBuilderTest {
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "host:1");
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
config.put(StreamsConfig.STATE_DIR_CONFIG,
TestUtils.tempDirectory().getAbsolutePath());
- final StreamsConfig streamsConfig = new StreamsConfig(config);
builder.addSource(null, sourceNodeName, null, null, null, "topic");
builder.addProcessor(goodNodeName, new LocalMockProcessorSupplier(),
sourceNodeName);
builder.addStateStore(
-
Stores.create(LocalMockProcessorSupplier.STORE_NAME).withStringKeys().withStringValues().inMemory().build(),
+
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(LocalMockProcessorSupplier.STORE_NAME),
Serdes.String(), Serdes.String()),
goodNodeName);
builder.addProcessor(badNodeName, new LocalMockProcessorSupplier(),
sourceNodeName);
@@ -641,17 +631,15 @@ public class InternalTopologyBuilderTest {
}
- @SuppressWarnings("unchecked")
@Test
- public void shouldAddTimestampExtractorPerSource() throws Exception {
+ public void shouldAddTimestampExtractorPerSource() {
builder.addSource(null, "source", new MockTimestampExtractor(), null,
null, "topic");
final ProcessorTopology processorTopology = builder.build(null);
assertThat(processorTopology.source("topic").getTimestampExtractor(),
instanceOf(MockTimestampExtractor.class));
}
- @SuppressWarnings("unchecked")
@Test
- public void shouldAddTimestampExtractorWithPatternPerSource() throws
Exception {
+ public void shouldAddTimestampExtractorWithPatternPerSource() {
final Pattern pattern = Pattern.compile("t.*");
builder.addSource(null, "source", new MockTimestampExtractor(), null,
null, pattern);
final ProcessorTopology processorTopology = builder.build(null);
@@ -659,7 +647,7 @@ public class InternalTopologyBuilderTest {
}
@Test
- public void shouldSortProcessorNodesCorrectly() throws Exception {
+ public void shouldSortProcessorNodesCorrectly() {
builder.addSource(null, "source1", null, null, null, "topic1");
builder.addSource(null, "source2", null, null, null, "topic2");
builder.addProcessor("processor1", new MockProcessorSupplier(),
"source1");
@@ -702,11 +690,12 @@ public class InternalTopologyBuilderTest {
assertEquals(1, node.size);
}
+ @SuppressWarnings("unchecked")
@Test
public void shouldConnectRegexMatchedTopicsToStateStore() throws Exception
{
builder.addSource(null, "ingest", null, null, null,
Pattern.compile("topic-\\d+"));
builder.addProcessor("my-processor", new MockProcessorSupplier(),
"ingest");
- builder.addStateStore(new MockStateStoreSupplier("testStateStore",
false), "my-processor");
+ builder.addStateStore(storeBuilder, "my-processor");
final InternalTopologyBuilder.SubscriptionUpdates subscriptionUpdates
= new InternalTopologyBuilder.SubscriptionUpdates();
final Field updatedTopicsField =
subscriptionUpdates.getClass().getDeclaredField("updatedTopicSubscriptions");
@@ -722,7 +711,7 @@ public class InternalTopologyBuilderTest {
builder.setApplicationId("test-app");
final Map<String, List<String>> stateStoreAndTopics =
builder.stateStoreNameToSourceTopics();
- final List<String> topics = stateStoreAndTopics.get("testStateStore");
+ final List<String> topics =
stateStoreAndTopics.get(storeBuilder.name());
assertTrue("Expected to contain two topics", topics.size() == 2);
@@ -731,11 +720,12 @@ public class InternalTopologyBuilderTest {
assertFalse(topics.contains("topic-A"));
}
+ @SuppressWarnings("unchecked")
@Test(expected = TopologyException.class)
public void
shouldNotAllowToAddGlobalStoreWithSourceNameEqualsProcessorName() {
final String sameNameForSourceAndProcessor = "sameName";
builder.addGlobalStore(
- new MockStateStoreSupplier("anyName", false, false),
+ (StoreBuilder<KeyValueStore>) storeBuilder,
sameNameForSourceAndProcessor,
null,
null,
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index 51d4e05..9f2b242 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -25,15 +25,14 @@ import
org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TopologyTestDriverWrapper;
+import org.apache.kafka.streams.TopologyWrapper;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.To;
-import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
@@ -64,7 +63,7 @@ public class ProcessorTopologyTest {
private static final String OUTPUT_TOPIC_2 = "output-topic-2";
private static final String THROUGH_TOPIC_1 = "through-topic-1";
- private final TopologyBuilder builder = new TopologyBuilder();
+ private final TopologyWrapper topology = new TopologyWrapper();
private final MockProcessorSupplier mockProcessorSupplier = new
MockProcessorSupplier();
private final ConsumerRecordFactory<String, String> recordFactory = new
ConsumerRecordFactory<>(STRING_SERIALIZER, STRING_SERIALIZER, 0L);
@@ -94,36 +93,36 @@ public class ProcessorTopologyTest {
@Test
public void testTopologyMetadata() {
- builder.setApplicationId("X");
+ topology.setApplicationId("X");
- builder.addSource("source-1", "topic-1");
- builder.addSource("source-2", "topic-2", "topic-3");
- builder.addProcessor("processor-1", new MockProcessorSupplier<>(),
"source-1");
- builder.addProcessor("processor-2", new MockProcessorSupplier<>(),
"source-1", "source-2");
- builder.addSink("sink-1", "topic-3", "processor-1");
- builder.addSink("sink-2", "topic-4", "processor-1", "processor-2");
+ topology.addSource("source-1", "topic-1");
+ topology.addSource("source-2", "topic-2", "topic-3");
+ topology.addProcessor("processor-1", new MockProcessorSupplier<>(),
"source-1");
+ topology.addProcessor("processor-2", new MockProcessorSupplier<>(),
"source-1", "source-2");
+ topology.addSink("sink-1", "topic-3", "processor-1");
+ topology.addSink("sink-2", "topic-4", "processor-1", "processor-2");
- final ProcessorTopology topology = builder.build(null);
+ final ProcessorTopology processorTopology =
topology.getInternalBuilder().build();
- assertEquals(6, topology.processors().size());
+ assertEquals(6, processorTopology.processors().size());
- assertEquals(2, topology.sources().size());
+ assertEquals(2, processorTopology.sources().size());
- assertEquals(3, topology.sourceTopics().size());
+ assertEquals(3, processorTopology.sourceTopics().size());
- assertNotNull(topology.source("topic-1"));
+ assertNotNull(processorTopology.source("topic-1"));
- assertNotNull(topology.source("topic-2"));
+ assertNotNull(processorTopology.source("topic-2"));
- assertNotNull(topology.source("topic-3"));
+ assertNotNull(processorTopology.source("topic-3"));
- assertEquals(topology.source("topic-2"), topology.source("topic-3"));
+ assertEquals(processorTopology.source("topic-2"),
processorTopology.source("topic-3"));
}
@Test
public void testDrivingSimpleTopology() {
int partition = 10;
- driver = new
TopologyTestDriverWrapper(createSimpleTopology(partition).internalTopologyBuilder,
props);
+ driver = new
TopologyTestDriverWrapper(createSimpleTopology(partition), props);
driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1",
"value1"));
assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1", partition);
assertNoOutputRecord(OUTPUT_TOPIC_2);
@@ -144,7 +143,7 @@ public class ProcessorTopologyTest {
@Test
public void testDrivingMultiplexingTopology() {
- driver = new
TopologyTestDriverWrapper(createMultiplexingTopology().internalTopologyBuilder,
props);
+ driver = new TopologyTestDriverWrapper(createMultiplexingTopology(),
props);
driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1",
"value1"));
assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1(1)");
assertNextOutputRecord(OUTPUT_TOPIC_2, "key1", "value1(2)");
@@ -166,7 +165,7 @@ public class ProcessorTopologyTest {
@Test
public void testDrivingMultiplexByNameTopology() {
- driver = new
TopologyTestDriverWrapper(createMultiplexByNameTopology().internalTopologyBuilder,
props);
+ driver = new
TopologyTestDriverWrapper(createMultiplexByNameTopology(), props);
driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1",
"value1"));
assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1(1)");
assertNextOutputRecord(OUTPUT_TOPIC_2, "key1", "value1(2)");
@@ -189,7 +188,7 @@ public class ProcessorTopologyTest {
@Test
public void testDrivingStatefulTopology() {
String storeName = "entries";
- driver = new
TopologyTestDriverWrapper(createStatefulTopology(storeName).internalTopologyBuilder,
props);
+ driver = new
TopologyTestDriverWrapper(createStatefulTopology(storeName), props);
driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1",
"value1"));
driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key2",
"value2"));
driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key3",
"value3"));
@@ -203,19 +202,17 @@ public class ProcessorTopologyTest {
assertNull(store.get("key4"));
}
- @SuppressWarnings("unchecked")
@Test
public void shouldDriveGlobalStore() {
final String storeName = "my-store";
- final StateStoreSupplier storeSupplier = Stores.create(storeName)
-
.withStringKeys().withStringValues().inMemory().disableLogging().build();
final String global = "global";
final String topic = "topic";
- final TopologyBuilder topologyBuilder = this.builder
- .addGlobalStore(storeSupplier, global, STRING_DESERIALIZER,
STRING_DESERIALIZER, topic, "processor", define(new
StatefulProcessor(storeName)));
- driver = new
TopologyTestDriverWrapper(topologyBuilder.internalTopologyBuilder, props);
- final KeyValueStore<String, String> globalStore =
(KeyValueStore<String, String>)
topologyBuilder.globalStateStores().get("my-store");
+
topology.addGlobalStore(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName),
Serdes.String(), Serdes.String()).withLoggingDisabled(),
+ global, STRING_DESERIALIZER, STRING_DESERIALIZER, topic,
"processor", define(new StatefulProcessor(storeName)));
+
+ driver = new TopologyTestDriverWrapper(topology.getInternalBuilder(),
props);
+ final KeyValueStore<String, String> globalStore =
driver.getKeyValueStore(storeName);
driver.pipeInput(recordFactory.create(topic, "key1", "value1"));
driver.pipeInput(recordFactory.create(topic, "key2", "value2"));
assertEquals("value1", globalStore.get("key1"));
@@ -225,7 +222,7 @@ public class ProcessorTopologyTest {
@Test
public void testDrivingSimpleMultiSourceTopology() {
final int partition = 10;
- driver = new
TopologyTestDriverWrapper(createSimpleMultiSourceTopology(partition).internalTopologyBuilder,
props);
+ driver = new
TopologyTestDriverWrapper(createSimpleMultiSourceTopology(partition), props);
driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1",
"value1"));
assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1", partition);
@@ -238,7 +235,7 @@ public class ProcessorTopologyTest {
@Test
public void testDrivingForwardToSourceTopology() {
- driver = new
TopologyTestDriverWrapper(createForwardToSourceTopology().internalTopologyBuilder,
props);
+ driver = new
TopologyTestDriverWrapper(createForwardToSourceTopology(), props);
driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1",
"value1"));
driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key2",
"value2"));
driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key3",
"value3"));
@@ -249,7 +246,7 @@ public class ProcessorTopologyTest {
@Test
public void testDrivingInternalRepartitioningTopology() {
- driver = new
TopologyTestDriverWrapper(createInternalRepartitioningTopology().internalTopologyBuilder,
props);
+ driver = new
TopologyTestDriverWrapper(createInternalRepartitioningTopology(), props);
driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1",
"value1"));
driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key2",
"value2"));
driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key3",
"value3"));
@@ -260,7 +257,7 @@ public class ProcessorTopologyTest {
@Test
public void testDrivingInternalRepartitioningForwardingTimestampTopology()
{
- driver = new
TopologyTestDriverWrapper(createInternalRepartitioningWithValueTimestampTopology().internalTopologyBuilder,
props);
+ driver = new
TopologyTestDriverWrapper(createInternalRepartitioningWithValueTimestampTopology(),
props);
driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1",
"value1@1000"));
driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key2",
"value2@2000"));
driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key3",
"value3@3000"));
@@ -274,29 +271,29 @@ public class ProcessorTopologyTest {
@Test
public void shouldCreateStringWithSourceAndTopics() {
- builder.addSource("source", "topic1", "topic2");
- final ProcessorTopology topology = builder.build(null);
- final String result = topology.toString();
+ topology.addSource("source", "topic1", "topic2");
+ final ProcessorTopology processorTopology =
topology.getInternalBuilder().build();
+ final String result = processorTopology.toString();
assertThat(result, containsString("source:\n\t\ttopics:\t\t[topic1,
topic2]\n"));
}
@Test
public void shouldCreateStringWithMultipleSourcesAndTopics() {
- builder.addSource("source", "topic1", "topic2");
- builder.addSource("source2", "t", "t1", "t2");
- final ProcessorTopology topology = builder.build(null);
- final String result = topology.toString();
+ topology.addSource("source", "topic1", "topic2");
+ topology.addSource("source2", "t", "t1", "t2");
+ final ProcessorTopology processorTopology =
topology.getInternalBuilder().build();
+ final String result = processorTopology.toString();
assertThat(result, containsString("source:\n\t\ttopics:\t\t[topic1,
topic2]\n"));
assertThat(result, containsString("source2:\n\t\ttopics:\t\t[t, t1,
t2]\n"));
}
@Test
public void shouldCreateStringWithProcessors() {
- builder.addSource("source", "t")
+ topology.addSource("source", "t")
.addProcessor("processor", mockProcessorSupplier, "source")
.addProcessor("other", mockProcessorSupplier, "source");
- final ProcessorTopology topology = builder.build(null);
- final String result = topology.toString();
+ final ProcessorTopology processorTopology =
topology.getInternalBuilder().build();
+ final String result = processorTopology.toString();
assertThat(result, containsString("\t\tchildren:\t[processor,
other]"));
assertThat(result, containsString("processor:\n"));
assertThat(result, containsString("other:\n"));
@@ -304,14 +301,14 @@ public class ProcessorTopologyTest {
@Test
public void shouldRecursivelyPrintChildren() {
- builder.addSource("source", "t")
+ topology.addSource("source", "t")
.addProcessor("processor", mockProcessorSupplier, "source")
.addProcessor("child-one", mockProcessorSupplier, "processor")
.addProcessor("child-one-one", mockProcessorSupplier,
"child-one")
.addProcessor("child-two", mockProcessorSupplier, "processor")
.addProcessor("child-two-one", mockProcessorSupplier,
"child-two");
- final String result = builder.build(null).toString();
+ final String result = topology.getInternalBuilder().build().toString();
assertThat(result,
containsString("child-one:\n\t\tchildren:\t[child-one-one]"));
assertThat(result,
containsString("child-two:\n\t\tchildren:\t[child-two-one]"));
}
@@ -319,7 +316,7 @@ public class ProcessorTopologyTest {
@Test
public void shouldConsiderTimeStamps() {
final int partition = 10;
- driver = new
TopologyTestDriverWrapper(createSimpleTopology(partition).internalTopologyBuilder,
props);
+ driver = new
TopologyTestDriverWrapper(createSimpleTopology(partition), props);
driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", "value1",
10L));
driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key2", "value2",
20L));
driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key3", "value3",
30L));
@@ -331,7 +328,7 @@ public class ProcessorTopologyTest {
@Test
public void shouldConsiderModifiedTimeStamps() {
final int partition = 10;
- driver = new
TopologyTestDriverWrapper(createTimestampTopology(partition).internalTopologyBuilder,
props);
+ driver = new
TopologyTestDriverWrapper(createTimestampTopology(partition), props);
driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", "value1",
10L));
driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key2", "value2",
20L));
driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key3", "value3",
30L));
@@ -379,80 +376,92 @@ public class ProcessorTopologyTest {
};
}
- private TopologyBuilder createSimpleTopology(final int partition) {
- return builder
+ private InternalTopologyBuilder createSimpleTopology(final int partition) {
+ return ((TopologyWrapper) topology
.addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER,
INPUT_TOPIC_1)
.addProcessor("processor", define(new ForwardingProcessor()),
"source")
- .addSink("sink", OUTPUT_TOPIC_1, constantPartitioner(partition),
"processor");
+ .addSink("sink", OUTPUT_TOPIC_1, constantPartitioner(partition),
"processor"))
+ .getInternalBuilder();
}
- private TopologyBuilder createTimestampTopology(final int partition) {
- return builder
+ private InternalTopologyBuilder createTimestampTopology(final int
partition) {
+ return ((TopologyWrapper) topology
.addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER,
INPUT_TOPIC_1)
.addProcessor("processor", define(new TimestampProcessor()),
"source")
- .addSink("sink", OUTPUT_TOPIC_1, constantPartitioner(partition),
"processor");
+ .addSink("sink", OUTPUT_TOPIC_1, constantPartitioner(partition),
"processor"))
+ .getInternalBuilder();
}
- private TopologyBuilder createMultiplexingTopology() {
- return builder
+ private InternalTopologyBuilder createMultiplexingTopology() {
+ return ((TopologyWrapper) topology
.addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER,
INPUT_TOPIC_1)
.addProcessor("processor", define(new MultiplexingProcessor(2)),
"source")
.addSink("sink1", OUTPUT_TOPIC_1, "processor")
- .addSink("sink2", OUTPUT_TOPIC_2, "processor");
+ .addSink("sink2", OUTPUT_TOPIC_2, "processor"))
+ .getInternalBuilder();
}
- private TopologyBuilder createMultiplexByNameTopology() {
- return builder
+ private InternalTopologyBuilder createMultiplexByNameTopology() {
+ return ((TopologyWrapper) topology
.addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER,
INPUT_TOPIC_1)
.addProcessor("processor", define(new
MultiplexByNameProcessor(2)), "source")
.addSink("sink0", OUTPUT_TOPIC_1, "processor")
- .addSink("sink1", OUTPUT_TOPIC_2, "processor");
+ .addSink("sink1", OUTPUT_TOPIC_2, "processor"))
+ .getInternalBuilder();
}
- private TopologyBuilder createStatefulTopology(final String storeName) {
- return builder
+ private InternalTopologyBuilder createStatefulTopology(final String
storeName) {
+ return ((TopologyWrapper) topology
.addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER,
INPUT_TOPIC_1)
.addProcessor("processor", define(new
StatefulProcessor(storeName)), "source")
- .addStateStore(
-
Stores.create(storeName).withStringKeys().withStringValues().inMemory().build(),
- "processor"
- )
- .addSink("counts", OUTPUT_TOPIC_1, "processor");
+
.addStateStore(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName),
Serdes.String(), Serdes.String()), "processor")
+ .addSink("counts", OUTPUT_TOPIC_1, "processor"))
+ .getInternalBuilder();
}
- private TopologyBuilder createInternalRepartitioningTopology() {
- return builder
+ private InternalTopologyBuilder createInternalRepartitioningTopology() {
+ final InternalTopologyBuilder internalTopologyBuilder =
((TopologyWrapper) topology
.addSource("source", INPUT_TOPIC_1)
- .addInternalTopic(THROUGH_TOPIC_1)
.addSink("sink0", THROUGH_TOPIC_1, "source")
.addSource("source1", THROUGH_TOPIC_1)
- .addSink("sink1", OUTPUT_TOPIC_1, "source1");
+ .addSink("sink1", OUTPUT_TOPIC_1, "source1"))
+ .getInternalBuilder();
+
+ internalTopologyBuilder.addInternalTopic(THROUGH_TOPIC_1);
+
+ return internalTopologyBuilder;
}
- private TopologyBuilder
createInternalRepartitioningWithValueTimestampTopology() {
- return builder
+ private InternalTopologyBuilder
createInternalRepartitioningWithValueTimestampTopology() {
+ final InternalTopologyBuilder internalTopologyBuilder =
((TopologyWrapper) topology
.addSource("source", INPUT_TOPIC_1)
- .addInternalTopic(THROUGH_TOPIC_1)
.addProcessor("processor", define(new ValueTimestampProcessor()),
"source")
.addSink("sink0", THROUGH_TOPIC_1, "processor")
.addSource("source1", THROUGH_TOPIC_1)
- .addSink("sink1", OUTPUT_TOPIC_1, "source1");
+ .addSink("sink1", OUTPUT_TOPIC_1, "source1"))
+ .getInternalBuilder();
+
+ internalTopologyBuilder.addInternalTopic(THROUGH_TOPIC_1);
+
+ return internalTopologyBuilder;
}
- private TopologyBuilder createForwardToSourceTopology() {
- return builder.addSource("source-1", INPUT_TOPIC_1)
+ private InternalTopologyBuilder createForwardToSourceTopology() {
+ return ((TopologyWrapper) topology.addSource("source-1", INPUT_TOPIC_1)
.addSink("sink-1", OUTPUT_TOPIC_1, "source-1")
.addSource("source-2", OUTPUT_TOPIC_1)
- .addSink("sink-2", OUTPUT_TOPIC_2, "source-2");
+ .addSink("sink-2", OUTPUT_TOPIC_2, "source-2"))
+ .getInternalBuilder();
}
- private TopologyBuilder createSimpleMultiSourceTopology(int partition) {
- return builder.addSource("source-1", STRING_DESERIALIZER,
STRING_DESERIALIZER, INPUT_TOPIC_1)
+ private InternalTopologyBuilder createSimpleMultiSourceTopology(int
partition) {
+ return ((TopologyWrapper) topology.addSource("source-1",
STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
.addProcessor("processor-1", define(new
ForwardingProcessor()), "source-1")
.addSink("sink-1", OUTPUT_TOPIC_1,
constantPartitioner(partition), "processor-1")
.addSource("source-2", STRING_DESERIALIZER,
STRING_DESERIALIZER, INPUT_TOPIC_2)
.addProcessor("processor-2", define(new
ForwardingProcessor()), "source-2")
- .addSink("sink-2", OUTPUT_TOPIC_2,
constantPartitioner(partition), "processor-2");
+ .addSink("sink-2", OUTPUT_TOPIC_2,
constantPartitioner(partition), "processor-2"))
+ .getInternalBuilder();
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index 54ea1ce..9090012 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -347,7 +347,6 @@ public class StandbyTaskTest {
);
task.initializeStateStores();
-
restoreStateConsumer.assign(new
ArrayList<>(task.checkpointedOffsets().keySet()));
final byte[] serializedValue =
Serdes.Integer().serializer().serialize("", 1);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index 4e04b49..0048e73 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
@@ -40,10 +41,10 @@ import
org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
import
org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
import org.apache.kafka.streams.state.HostInfo;
import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.MockClientSupplier;
import org.apache.kafka.test.MockInternalTopicManager;
import org.apache.kafka.test.MockProcessorSupplier;
-import org.apache.kafka.test.MockStateStoreSupplier;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.junit.Test;
@@ -328,10 +329,10 @@ public class StreamsPartitionAssignorTest {
public void testAssignWithPartialTopology() {
builder.addSource(null, "source1", null, null, null, "topic1");
builder.addProcessor("processor1", new MockProcessorSupplier(),
"source1");
- builder.addStateStore(new MockStateStoreSupplier("store1", false),
"processor1");
+
builder.addStateStore(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("store1"),
Serdes.ByteArray(), Serdes.ByteArray()), "processor1");
builder.addSource(null, "source2", null, null, null, "topic2");
builder.addProcessor("processor2", new MockProcessorSupplier(),
"source2");
- builder.addStateStore(new MockStateStoreSupplier("store2", false),
"processor2");
+
builder.addStateStore(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("store2"),
Serdes.ByteArray(), Serdes.ByteArray()), "processor2");
List<String> topics = Utils.mkList("topic1", "topic2");
Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2);
@@ -469,11 +470,11 @@ public class StreamsPartitionAssignorTest {
builder.addSource(null, "source2", null, null, null, "topic2");
builder.addProcessor("processor-1", new MockProcessorSupplier(),
"source1");
- builder.addStateStore(new MockStateStoreSupplier("store1", false),
"processor-1");
+
builder.addStateStore(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("store1"),
Serdes.ByteArray(), Serdes.ByteArray()), "processor-1");
builder.addProcessor("processor-2", new MockProcessorSupplier(),
"source2");
- builder.addStateStore(new MockStateStoreSupplier("store2", false),
"processor-2");
- builder.addStateStore(new MockStateStoreSupplier("store3", false),
"processor-2");
+
builder.addStateStore(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("store2"),
Serdes.ByteArray(), Serdes.ByteArray()), "processor-2");
+
builder.addStateStore(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("store3"),
Serdes.ByteArray(), Serdes.ByteArray()), "processor-2");
List<String> topics = Utils.mkList("topic1", "topic2");
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java
index 665ebc0..5383c27 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java
@@ -17,7 +17,6 @@
package org.apache.kafka.streams.state;
import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
import org.apache.kafka.streams.state.internals.MemoryNavigableLRUCache;
import org.apache.kafka.streams.state.internals.RocksDBSessionStore;
@@ -25,16 +24,10 @@ import
org.apache.kafka.streams.state.internals.RocksDBStore;
import org.apache.kafka.streams.state.internals.RocksDBWindowStore;
import org.junit.Test;
-import java.util.Collections;
-import java.util.Map;
-
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
import static org.hamcrest.core.IsNot.not;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class StoresTest {
@@ -104,70 +97,10 @@ public class StoresTest {
Stores.sessionStoreBuilder(null, Serdes.ByteArray(),
Serdes.ByteArray());
}
- @SuppressWarnings("deprecation")
- @Test
- public void shouldCreateInMemoryStoreSupplierWithLoggedConfig() {
- final StateStoreSupplier supplier = Stores.create("store")
- .withKeys(Serdes.String())
- .withValues(Serdes.String())
- .inMemory()
- .enableLogging(Collections.singletonMap("retention.ms",
"1000"))
- .build();
-
- final Map<String, String> config = supplier.logConfig();
- assertTrue(supplier.loggingEnabled());
- assertEquals("1000", config.get("retention.ms"));
- }
-
- @SuppressWarnings("deprecation")
- @Test
- public void shouldCreateInMemoryStoreSupplierNotLogged() {
- final StateStoreSupplier supplier = Stores.create("store")
- .withKeys(Serdes.String())
- .withValues(Serdes.String())
- .inMemory()
- .disableLogging()
- .build();
-
- assertFalse(supplier.loggingEnabled());
- }
-
- @SuppressWarnings("deprecation")
- @Test
- public void shouldCreatePersistenStoreSupplierWithLoggedConfig() {
- final StateStoreSupplier supplier = Stores.create("store")
- .withKeys(Serdes.String())
- .withValues(Serdes.String())
- .persistent()
- .enableLogging(Collections.singletonMap("retention.ms",
"1000"))
- .build();
-
- final Map<String, String> config = supplier.logConfig();
- assertTrue(supplier.loggingEnabled());
- assertEquals("1000", config.get("retention.ms"));
- }
-
- @SuppressWarnings("deprecation")
- @Test
- public void shouldCreatePersistenStoreSupplierNotLogged() {
- final StateStoreSupplier supplier = Stores.create("store")
- .withKeys(Serdes.String())
- .withValues(Serdes.String())
- .persistent()
- .disableLogging()
- .build();
-
- assertFalse(supplier.loggingEnabled());
- }
-
@Test
public void
shouldThrowIllegalArgumentExceptionWhenTryingToConstructWindowStoreWithLessThanTwoSegments()
{
- final Stores.PersistentKeyValueFactory<String, String> storeFactory =
Stores.create("store")
- .withKeys(Serdes.String())
- .withValues(Serdes.String())
- .persistent();
try {
- storeFactory.windowed(1, 1, 1, false);
+ Stores.persistentWindowStore("store", 1, 1, 1, false);
fail("Should have thrown illegal argument exception as number of
segments is less than 2");
} catch (final IllegalArgumentException e) {
// ok
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
index 4ff0b90..a061ff2 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
@@ -16,12 +16,18 @@
*/
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
+import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreTypes;
+import org.apache.kafka.streams.state.StateSerdes;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.NoOpReadOnlyStore;
+import org.apache.kafka.test.NoOpRecordCollector;
import org.apache.kafka.test.StateStoreProviderStub;
import org.junit.Before;
import org.junit.Test;
@@ -64,7 +70,16 @@ public class CompositeReadOnlyKeyValueStoreTest {
}
private KeyValueStore<String, String> newStoreInstance() {
- return StateStoreTestUtils.newKeyValueStore(storeName, "app-id",
String.class, String.class);
+ final KeyValueStore<String, String> store =
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName),
+ Serdes.String(),
+ Serdes.String())
+ .build();
+
+ store.init(new InternalMockProcessorContext(new
StateSerdes<>(ProcessorStateManager.storeChangelogTopic("appId", storeName),
Serdes.String(), Serdes.String()),
+ new NoOpRecordCollector()),
+ store);
+
+ return store;
}
@Test
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplierTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplierTest.java
deleted file mode 100644
index b25b8cb..0000000
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplierTest.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state.internals;
-
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.utils.LogContext;
-import org.apache.kafka.streams.StreamsMetrics;
-import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
-import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.test.InternalMockProcessorContext;
-import org.apache.kafka.test.NoOpRecordCollector;
-import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.core.IsInstanceOf.instanceOf;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-public class RocksDBKeyValueStoreSupplierTest {
-
- private static final String STORE_NAME = "name";
- private final ThreadCache cache = new ThreadCache(new LogContext("test "),
1024, new MockStreamsMetrics(new Metrics()));
- private final InternalMockProcessorContext context = new
InternalMockProcessorContext(TestUtils.tempDirectory(),
-
Serdes.String(),
-
Serdes.String(),
- new
NoOpRecordCollector(),
-
cache);
- private KeyValueStore<String, String> store;
-
- @After
- public void close() {
- store.close();
- }
-
- @Test
- public void shouldCreateLoggingEnabledStoreWhenStoreLogged() {
- store = createStore(true, false);
- final List<ProducerRecord> logged = new ArrayList<>();
- final NoOpRecordCollector collector = new NoOpRecordCollector() {
- @Override
- public <K, V> void send(final String topic,
- K key,
- V value,
- Integer partition,
- Long timestamp,
- Serializer<K> keySerializer,
- Serializer<V> valueSerializer) {
- logged.add(new ProducerRecord<K, V>(topic, partition,
timestamp, key, value));
- }
- };
- final InternalMockProcessorContext context = new
InternalMockProcessorContext(TestUtils.tempDirectory(),
-
Serdes.String(),
-
Serdes.String(),
-
collector,
- cache);
- context.setTime(1);
- store.init(context, store);
- store.put("a", "b");
- assertFalse(logged.isEmpty());
- }
-
- @Test
- public void shouldNotBeLoggingEnabledStoreWhenLoggingNotEnabled() {
- store = createStore(false, false);
- final List<ProducerRecord> logged = new ArrayList<>();
- final NoOpRecordCollector collector = new NoOpRecordCollector() {
- @Override
- public <K, V> void send(final String topic,
- K key,
- V value,
- Integer partition,
- Long timestamp,
- Serializer<K> keySerializer,
- Serializer<V> valueSerializer) {
- logged.add(new ProducerRecord<>(topic, partition, timestamp,
key, value));
- }
- };
- final InternalMockProcessorContext context = new
InternalMockProcessorContext(TestUtils.tempDirectory(),
-
Serdes.String(),
-
Serdes.String(),
-
collector,
- cache);
- context.setTime(1);
- store.init(context, store);
- store.put("a", "b");
- assertTrue(logged.isEmpty());
- }
-
- @Test
- public void shouldHaveCachedKeyValueStoreWhenCachingEnabled() {
- store = createStore(false, true);
- store.init(context, store);
- context.setTime(1);
- store.put("a", "b");
- store.put("b", "c");
- assertThat(((WrappedStateStore) store).wrappedStore(),
is(instanceOf(CachingKeyValueStore.class)));
- assertThat(cache.size(), is(2L));
- }
-
- @Test
- public void shouldReturnMeteredStoreWhenCachingAndLoggingDisabled() {
- store = createStore(false, false);
- assertThat(store, is(instanceOf(MeteredKeyValueBytesStore.class)));
- }
-
- @Test
- public void shouldReturnMeteredStoreWhenCachingDisabled() {
- store = createStore(true, false);
- assertThat(store, is(instanceOf(MeteredKeyValueBytesStore.class)));
- }
-
- @Test
- public void shouldHaveMeteredStoreWhenCached() {
- store = createStore(false, true);
- store.init(context, store);
- final StreamsMetrics metrics = context.metrics();
- assertFalse(metrics.metrics().isEmpty());
- }
-
- @Test
- public void shouldHaveMeteredStoreWhenLogged() {
- store = createStore(true, false);
- store.init(context, store);
- final StreamsMetrics metrics = context.metrics();
- assertFalse(metrics.metrics().isEmpty());
- }
-
- @SuppressWarnings("unchecked")
- private KeyValueStore<String, String> createStore(final boolean logged,
final boolean cached) {
- return new RocksDBKeyValueStoreSupplier<>(STORE_NAME,
- Serdes.String(),
- Serdes.String(),
- logged,
- Collections.EMPTY_MAP,
- cached).get();
- }
-
-}
\ No newline at end of file
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplierTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplierTest.java
deleted file mode 100644
index c50dfba..0000000
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplierTest.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state.internals;
-
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.utils.LogContext;
-import org.apache.kafka.streams.StreamsMetrics;
-import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.kstream.internals.SessionWindow;
-import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
-import org.apache.kafka.streams.state.SessionStore;
-import org.apache.kafka.test.InternalMockProcessorContext;
-import org.apache.kafka.test.NoOpRecordCollector;
-import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.core.IsInstanceOf.instanceOf;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-public class RocksDBSessionStoreSupplierTest {
-
- private static final String STORE_NAME = "name";
- private final List<ProducerRecord> logged = new ArrayList<>();
- private final ThreadCache cache = new ThreadCache(new LogContext("test "),
1024, new MockStreamsMetrics(new Metrics()));
- private final InternalMockProcessorContext context = new
InternalMockProcessorContext(TestUtils.tempDirectory(),
- Serdes.String(),
- Serdes.String(),
- new NoOpRecordCollector() {
- @Override
- public <K, V> void send(final String topic,
- final K key,
- final V value,
- final Integer partition,
- final Long timestamp,
- final Serializer<K> keySerializer,
- final Serializer<V> valueSerializer) {
- logged.add(new ProducerRecord<>(topic, partition, timestamp,
key, value));
- }
- },
- cache);
-
- private SessionStore<String, String> store;
-
- @After
- public void close() {
- store.close();
- }
-
- @Test
- public void shouldCreateLoggingEnabledStoreWhenStoreLogged() {
- store = createStore(true, false);
- context.setTime(1);
- store.init(context, store);
- store.put(new Windowed<>("a", new SessionWindow(0, 10)), "b");
- assertFalse(logged.isEmpty());
- }
-
- @Test
- public void shouldNotBeLoggingEnabledStoreWhenLoggingNotEnabled() {
- store = createStore(false, false);
- context.setTime(1);
- store.init(context, store);
- store.put(new Windowed<>("a", new SessionWindow(0, 10)), "b");
- assertTrue(logged.isEmpty());
- }
-
- @Test
- public void shouldReturnCachedSessionStoreWhenCachingEnabled() {
- store = createStore(false, true);
- store.init(context, store);
- context.setTime(1);
- store.put(new Windowed<>("a", new SessionWindow(0, 10)), "b");
- store.put(new Windowed<>("b", new SessionWindow(0, 10)), "c");
- assertThat(((WrappedStateStore) store).wrappedStore(),
is(instanceOf(CachingSessionStore.class)));
- assertThat(cache.size(), is(2L));
- }
-
- @Test
- public void shouldHaveMeteredStoreWhenCached() {
- store = createStore(false, true);
- store.init(context, store);
- final StreamsMetrics metrics = context.metrics();
- assertFalse(metrics.metrics().isEmpty());
- }
-
- @Test
- public void shouldHaveMeteredStoreWhenLogged() {
- store = createStore(true, false);
- store.init(context, store);
- final StreamsMetrics metrics = context.metrics();
- assertFalse(metrics.metrics().isEmpty());
- }
-
- @Test
- public void shouldHaveMeteredStoreWhenNotLoggedOrCached() {
- store = createStore(false, false);
- store.init(context, store);
- final StreamsMetrics metrics = context.metrics();
- assertFalse(metrics.metrics().isEmpty());
- }
-
-
-
- private SessionStore<String, String> createStore(final boolean logged,
final boolean cached) {
- return new RocksDBSessionStoreSupplier<>(STORE_NAME,
- 10,
- Serdes.String(),
- Serdes.String(),
- logged,
- Collections.<String,
String>emptyMap(),
- cached).get();
- }
-
-}
\ No newline at end of file
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java
deleted file mode 100644
index 7409a13..0000000
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state.internals;
-
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.utils.LogContext;
-import org.apache.kafka.streams.StreamsMetrics;
-import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
-import org.apache.kafka.streams.state.WindowStore;
-import org.apache.kafka.test.InternalMockProcessorContext;
-import org.apache.kafka.test.NoOpRecordCollector;
-import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.core.IsInstanceOf.instanceOf;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-public class RocksDBWindowStoreSupplierTest {
-
- private static final String STORE_NAME = "name";
- private WindowStore<String, String> store;
- private final ThreadCache cache = new ThreadCache(new LogContext("test "),
1024, new MockStreamsMetrics(new Metrics()));
- private final InternalMockProcessorContext context = new
InternalMockProcessorContext(TestUtils.tempDirectory(),
-
Serdes.String(),
-
Serdes.String(),
- new
NoOpRecordCollector(),
-
cache);
-
- @After
- public void close() {
- if (store != null) {
- store.close();
- }
- }
-
- @Test
- public void shouldCreateLoggingEnabledStoreWhenWindowStoreLogged() {
- store = createStore(true, false, 3);
- final List<ProducerRecord> logged = new ArrayList<>();
- final NoOpRecordCollector collector = new NoOpRecordCollector() {
- @Override
- public <K, V> void send(final String topic,
- K key,
- V value,
- Integer partition,
- Long timestamp,
- Serializer<K> keySerializer,
- Serializer<V> valueSerializer) {
- logged.add(new ProducerRecord<K, V>(topic, partition,
timestamp, key, value));
- }
- };
- final InternalMockProcessorContext context = new
InternalMockProcessorContext(TestUtils.tempDirectory(),
-
Serdes.String(),
-
Serdes.String(),
-
collector,
- cache);
- context.setTime(1);
- store.init(context, store);
- store.put("a", "b");
- assertFalse(logged.isEmpty());
- }
-
- @Test
- public void shouldNotBeLoggingEnabledStoreWhenLogginNotEnabled() {
- store = createStore(false, false, 3);
- final List<ProducerRecord> logged = new ArrayList<>();
- final NoOpRecordCollector collector = new NoOpRecordCollector() {
- @Override
- public <K, V> void send(final String topic,
- K key,
- V value,
- Integer partition,
- Long timestamp,
- Serializer<K> keySerializer,
- Serializer<V> valueSerializer) {
- logged.add(new ProducerRecord<K, V>(topic, partition,
timestamp, key, value));
- }
- };
- final InternalMockProcessorContext context = new
InternalMockProcessorContext(TestUtils.tempDirectory(),
-
Serdes.String(),
-
Serdes.String(),
-
collector,
- cache);
- context.setTime(1);
- store.init(context, store);
- store.put("a", "b");
- assertTrue(logged.isEmpty());
- }
-
- @Test
- public void shouldBeCachedWindowStoreWhenCachingEnabled() {
- store = createStore(false, true, 3);
- store.init(context, store);
- context.setTime(1);
- store.put("a", "b");
- store.put("b", "c");
- assertThat(((WrappedStateStore) store).wrappedStore(),
is(instanceOf(CachingWindowStore.class)));
- assertThat(context.getCache().size(), is(2L));
- }
-
- @Test
- public void shouldHaveMeteredStoreAsOuterMost() {
- assertThat(createStore(false, false, 2),
instanceOf(MeteredWindowStore.class));
- assertThat(createStore(false, true, 2),
instanceOf(MeteredWindowStore.class));
- assertThat(createStore(true, false, 2),
instanceOf(MeteredWindowStore.class));
- }
- @Test
- public void shouldHaveMeteredStoreWhenCached() {
- store = createStore(false, true, 3);
- store.init(context, store);
- final StreamsMetrics metrics = context.metrics();
- assertFalse(metrics.metrics().isEmpty());
- }
-
- @Test
- public void shouldHaveMeteredStoreWhenLogged() {
- store = createStore(true, false, 3);
- store.init(context, store);
- final StreamsMetrics metrics = context.metrics();
- assertFalse(metrics.metrics().isEmpty());
- }
-
- @Test
- public void shouldHaveMeteredStoreWhenNotLoggedOrCached() {
- store = createStore(false, false, 3);
- store.init(context, store);
- final StreamsMetrics metrics = context.metrics();
- assertFalse(metrics.metrics().isEmpty());
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void shouldThrowIllegalArgumentExceptionIfNumSegmentsLessThanTwo() {
- createStore(true, true, 1);
- }
-
- @SuppressWarnings("unchecked")
- private WindowStore<String, String> createStore(final boolean logged,
final boolean cached, final int numSegments) {
- return new RocksDBWindowStoreSupplier<>(STORE_NAME,
- 10,
- numSegments,
- false,
- Serdes.String(),
- Serdes.String(),
- 10,
- logged,
- Collections.<String,
String>emptyMap(),
- cached).get();
- }
-
-}
\ No newline at end of file
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java
deleted file mode 100644
index b1818c2..0000000
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state.internals;
-
-import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
-import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.StateSerdes;
-import org.apache.kafka.test.InternalMockProcessorContext;
-import org.apache.kafka.test.NoOpRecordCollector;
-
-import java.util.Collections;
-
-@SuppressWarnings("unchecked")
-public class StateStoreTestUtils {
-
- public static <K, V> KeyValueStore<K, V> newKeyValueStore(final String
name,
- final String
applicationId,
- final Class<K>
keyType,
- final Class<V>
valueType) {
- final InMemoryKeyValueStoreSupplier<K, V> supplier = new
InMemoryKeyValueStoreSupplier<>(name,
-
null,
-
null,
-
new MockTime(),
-
false,
-
Collections.<String, String>emptyMap());
-
- final StateStore stateStore = supplier.get();
- stateStore.init(
- new InternalMockProcessorContext(
- StateSerdes.withBuiltinTypes(
- ProcessorStateManager.storeChangelogTopic(applicationId,
name),
- keyType,
- valueType),
- new NoOpRecordCollector()),
- stateStore);
- return (KeyValueStore<K, V>) stateStore;
-
- }
-
-}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index dc04536..c24122a 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -20,13 +20,14 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TopologyWrapper;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.StateDirectory;
@@ -68,21 +69,13 @@ public class StreamThreadStateStoreProviderTest {
private StreamThread threadMock;
private Map<TaskId, StreamTask> tasks;
- @SuppressWarnings("deprecation")
@Before
public void before() {
- final TopologyBuilder builder = new TopologyBuilder();
- builder.addSource("the-source", topicName);
- builder.addProcessor("the-processor", new MockProcessorSupplier(),
"the-source");
- builder.addStateStore(Stores.create("kv-store")
- .withStringKeys()
- .withStringValues().inMemory().build(), "the-processor");
-
- builder.addStateStore(Stores.create("window-store")
- .withStringKeys()
- .withStringValues()
- .persistent()
- .windowed(10, 10, 2, false).build(), "the-processor");
+ final TopologyWrapper topology = new TopologyWrapper();
+ topology.addSource("the-source", topicName);
+ topology.addProcessor("the-processor", new MockProcessorSupplier(),
"the-source");
+
topology.addStateStore(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("kv-store"),
Serdes.String(), Serdes.String()), "the-processor");
+
topology.addStateStore(Stores.windowStoreBuilder(Stores.persistentWindowStore("window-store",
10, 2, 2, false), Serdes.String(), Serdes.String()), "the-processor");
final Properties properties = new Properties();
final String applicationId = "applicationId";
@@ -96,17 +89,17 @@ public class StreamThreadStateStoreProviderTest {
configureRestoreConsumer(clientSupplier,
"applicationId-kv-store-changelog");
configureRestoreConsumer(clientSupplier,
"applicationId-window-store-changelog");
- builder.setApplicationId(applicationId);
- final ProcessorTopology topology = builder.build(null);
+ topology.setApplicationId(applicationId);
+ final ProcessorTopology processorTopology =
topology.getInternalBuilder().build();
tasks = new HashMap<>();
stateDirectory = new StateDirectory(streamsConfig, new MockTime());
- taskOne = createStreamsTask(streamsConfig, clientSupplier, topology,
new TaskId(0, 0));
+ taskOne = createStreamsTask(streamsConfig, clientSupplier,
processorTopology, new TaskId(0, 0));
taskOne.initializeStateStores();
tasks.put(new TaskId(0, 0), taskOne);
- final StreamTask taskTwo = createStreamsTask(streamsConfig,
clientSupplier, topology, new TaskId(0, 1));
+ final StreamTask taskTwo = createStreamsTask(streamsConfig,
clientSupplier, processorTopology, new TaskId(0, 1));
taskTwo.initializeStateStores();
tasks.put(new TaskId(0, 1), taskTwo);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java
index b5379d7..06c14ee 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java
@@ -17,11 +17,13 @@
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.state.NoOpWindowStore;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.ReadOnlyWindowStore;
+import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.StateStoreProviderStub;
import org.junit.Before;
import org.junit.Test;
@@ -42,9 +44,15 @@ public class WrappingStoreProviderTest {
final StateStoreProviderStub stubProviderTwo = new
StateStoreProviderStub(false);
- stubProviderOne.addStore("kv",
StateStoreTestUtils.newKeyValueStore("kv", "app-id", String.class,
String.class));
+ stubProviderOne.addStore("kv",
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("kv"),
+ Serdes.serdeFrom(String.class),
+ Serdes.serdeFrom(String.class))
+ .build());
stubProviderOne.addStore("window", new NoOpWindowStore());
- stubProviderTwo.addStore("kv",
StateStoreTestUtils.newKeyValueStore("kv", "app-id", String.class,
String.class));
+ stubProviderTwo.addStore("kv",
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("kv"),
+ Serdes.serdeFrom(String.class),
+ Serdes.serdeFrom(String.class))
+ .build());
stubProviderTwo.addStore("window", new NoOpWindowStore());
wrappingStoreProvider = new WrappingStoreProvider(
--
To stop receiving notification emails like this one, please contact
[email protected].