Repository: kafka
Updated Branches:
  refs/heads/trunk 69a1cced4 -> 5e8958a85


MINOR: initialize Serdes with ProcessorContext

guozhangwang

Author: Yasuhiro Matsuda <[email protected]>

Reviewers: Guozhang Wang

Closes #589 from ymatsuda/init_serdes_with_procctx


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5e8958a8
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5e8958a8
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5e8958a8

Branch: refs/heads/trunk
Commit: 5e8958a856a5b4ccbdcb610473cab4c2eeddbac5
Parents: 69a1cce
Author: Yasuhiro Matsuda <[email protected]>
Authored: Wed Nov 25 15:21:17 2015 -0800
Committer: Guozhang Wang <[email protected]>
Committed: Wed Nov 25 15:21:17 2015 -0800

----------------------------------------------------------------------
 .../kafka/streams/examples/ProcessorJob.java    |  2 +-
 .../streams/state/MeteredKeyValueStore.java     |  1 +
 .../state/RocksDBKeyValueStoreSupplier.java     |  2 +
 .../org/apache/kafka/streams/state/Serdes.java  | 61 +++++++-------------
 .../org/apache/kafka/streams/state/Stores.java  |  5 +-
 .../internals/ProcessorTopologyTest.java        |  2 +-
 .../state/AbstractKeyValueStoreTest.java        | 12 ++--
 .../state/InMemoryKeyValueStoreTest.java        |  6 +-
 .../state/InMemoryLRUCacheStoreTest.java        |  7 ++-
 .../streams/state/KeyValueStoreTestDriver.java  |  9 ---
 .../streams/state/RocksDBKeyValueStoreTest.java |  6 +-
 11 files changed, 40 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/5e8958a8/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java 
b/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java
index 3274aae..882c7ed 100644
--- a/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java
+++ b/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java
@@ -104,7 +104,7 @@ public class ProcessorJob {
         builder.addSource("SOURCE", new StringDeserializer(), new 
StringDeserializer(), "topic-source");
 
         builder.addProcessor("PROCESS", new MyProcessorSupplier(), "SOURCE");
-        builder.addStateStore(Stores.create("local-state", 
config).withStringKeys().withIntegerValues().inMemory().build());
+        
builder.addStateStore(Stores.create("local-state").withStringKeys().withIntegerValues().inMemory().build());
         builder.connectProcessorAndStateStores("local-state", "PROCESS");
 
         builder.addSink("SINK", "topic-sink", new StringSerializer(), new 
IntegerSerializer(), "PROCESS");

http://git-wip-us.apache.org/repos/asf/kafka/blob/5e8958a8/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
index c1ccbd4..b68f763 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
@@ -91,6 +91,7 @@ public class MeteredKeyValueStore<K, V> implements 
KeyValueStore<K, V> {
         this.flushTime = this.metrics.addLatencySensor(metricGrp, name, 
"flush", "store-name", name);
         this.restoreTime = this.metrics.addLatencySensor(metricGrp, name, 
"restore", "store-name", name);
 
+        serialization.init(context);
         this.context = context;
         this.partition = context.id().partition;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/5e8958a8/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreSupplier.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreSupplier.java
index fe8f00a..f1fbd9f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreSupplier.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreSupplier.java
@@ -118,6 +118,8 @@ public class RocksDBKeyValueStoreSupplier<K, V> implements 
StateStoreSupplier {
         }
 
         public void init(ProcessorContext context) {
+            serdes.init(context);
+
             this.context = context;
             this.partition = context.id().partition;
             this.dbName = this.topic + "." + this.partition;

http://git-wip-us.apache.org/repos/asf/kafka/blob/5e8958a8/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java 
b/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java
index 31bd439..f41d928 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java
@@ -26,7 +26,7 @@ import org.apache.kafka.common.serialization.LongSerializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.StreamingConfig;
+import org.apache.kafka.streams.processor.ProcessorContext;
 
 final class Serdes<K, V> {
 
@@ -57,19 +57,21 @@ final class Serdes<K, V> {
     }
 
     private final String topic;
-    private final Serializer<K> keySerializer;
-    private final Serializer<V> valueSerializer;
-    private final Deserializer<K> keyDeserializer;
-    private final Deserializer<V> valueDeserializer;
+    private Serializer<K> keySerializer;
+    private Serializer<V> valueSerializer;
+    private Deserializer<K> keyDeserializer;
+    private Deserializer<V> valueDeserializer;
 
     /**
-     * Create a context for serialization using the specified serializers and 
deserializers.
+     * Create a context for serialization using the specified serializers and 
deserializers, or if any of them are null the
+     * corresponding {@link ProcessorContext}'s serializer or deserializer, 
which
+     * <em>must</em> match the key and value types used as parameters for this 
object.
      *
      * @param topic the name of the topic
-     * @param keySerializer the serializer for keys; may not be null
-     * @param keyDeserializer the deserializer for keys; may not be null
-     * @param valueSerializer the serializer for values; may not be null
-     * @param valueDeserializer the deserializer for values; may not be null
+     * @param keySerializer the serializer for keys; may be null
+     * @param keyDeserializer the deserializer for keys; may be null
+     * @param valueSerializer the serializer for values; may be null
+     * @param valueDeserializer the deserializer for values; may be null
      */
     public Serdes(String topic,
             Serializer<K> keySerializer, Deserializer<K> keyDeserializer,
@@ -82,45 +84,22 @@ final class Serdes<K, V> {
     }
 
     /**
-     * Create a context for serialization using the specified serializers and 
deserializers, or if any of them are null the
-     * corresponding {@link StreamingConfig}'s serializer or deserializer, 
which
+     * Create a context for serialization using the {@link ProcessorContext}'s 
serializers and deserializers, which
      * <em>must</em> match the key and value types used as parameters for this 
object.
      *
      * @param topic the name of the topic
-     * @param keySerializer the serializer for keys; may be null if the {@link 
StreamingConfig#keySerializer() default
-     *            key serializer} should be used
-     * @param keyDeserializer the deserializer for keys; may be null if the 
{@link StreamingConfig#keyDeserializer() default
-     *            key deserializer} should be used
-     * @param valueSerializer the serializer for values; may be null if the 
{@link StreamingConfig#valueSerializer() default
-     *            value serializer} should be used
-     * @param valueDeserializer the deserializer for values; may be null if 
the {@link StreamingConfig#valueDeserializer()
-     *            default value deserializer} should be used
-     * @param config the streaming config
      */
     @SuppressWarnings("unchecked")
-    public Serdes(String topic,
-            Serializer<K> keySerializer, Deserializer<K> keyDeserializer,
-            Serializer<V> valueSerializer, Deserializer<V> valueDeserializer,
-            StreamingConfig config) {
-        this.topic = topic;
-
-        this.keySerializer = keySerializer != null ? keySerializer : 
config.keySerializer();
-        this.keyDeserializer = keyDeserializer != null ? keyDeserializer : 
config.keyDeserializer();
-        this.valueSerializer = valueSerializer != null ? valueSerializer : 
config.valueSerializer();
-        this.valueDeserializer = valueDeserializer != null ? valueDeserializer 
: config.valueDeserializer();
+    public Serdes(String topic) {
+        this(topic, null, null, null, null);
     }
 
-    /**
-     * Create a context for serialization using the {@link StreamingConfig}'s 
serializers and deserializers, which
-     * <em>must</em> match the key and value types used as parameters for this 
object.
-     *
-     * @param topic the name of the topic
-     * @param config the streaming config
-     */
     @SuppressWarnings("unchecked")
-    public Serdes(String topic,
-                  StreamingConfig config) {
-        this(topic, null, null, null, null, config);
+    public void init(ProcessorContext context) {
+        keySerializer = keySerializer != null ? keySerializer : 
(Serializer<K>) context.keySerializer();
+        keyDeserializer = keyDeserializer != null ? keyDeserializer : 
(Deserializer<K>) context.keyDeserializer();
+        valueSerializer = valueSerializer != null ? valueSerializer : 
(Serializer<V>) context.valueSerializer();
+        valueDeserializer = valueDeserializer != null ? valueDeserializer : 
(Deserializer<V>) context.valueDeserializer();
     }
 
     public Deserializer<K> keyDeserializer() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/5e8958a8/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java 
b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index c5f040f..5452040 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -26,7 +26,6 @@ import org.apache.kafka.common.serialization.LongSerializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.StreamingConfig;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
 
 /**
@@ -40,7 +39,7 @@ public class Stores {
      * @param name the name of the store
      * @return the factory that can be used to specify other options or 
configurations for the store; never null
      */
-    public static StoreFactory create(final String name, final StreamingConfig 
config) {
+    public static StoreFactory create(final String name) {
         return new StoreFactory() {
             @Override
             public <K> ValueFactory<K> withKeys(final Serializer<K> 
keySerializer, final Deserializer<K> keyDeserializer) {
@@ -49,7 +48,7 @@ public class Stores {
                     public <V> KeyValueFactory<K, V> withValues(final 
Serializer<V> valueSerializer,
                                                                 final 
Deserializer<V> valueDeserializer) {
                         final Serdes<K, V> serdes =
-                                new Serdes<>(name, keySerializer, 
keyDeserializer, valueSerializer, valueDeserializer, config);
+                                new Serdes<>(name, keySerializer, 
keyDeserializer, valueSerializer, valueDeserializer);
                         return new KeyValueFactory<K, V>() {
                             @Override
                             public InMemoryKeyValueFactory<K, V> inMemory() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/5e8958a8/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index 54096b2..2f359bc 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -204,7 +204,7 @@ public class ProcessorTopologyTest {
         return new TopologyBuilder().addSource("source", STRING_DESERIALIZER, 
STRING_DESERIALIZER, INPUT_TOPIC)
                                     .addProcessor("processor", define(new 
StatefulProcessor(storeName)), "source")
                                     .addStateStore(
-                                            Stores.create(storeName, 
config).withStringKeys().withStringValues().inMemory().build(),
+                                            
Stores.create(storeName).withStringKeys().withStringValues().inMemory().build(),
                                             "processor"
                                     )
                                     .addSink("counts", OUTPUT_TOPIC_1, 
"processor");

http://git-wip-us.apache.org/repos/asf/kafka/blob/5e8958a8/streams/src/test/java/org/apache/kafka/streams/state/AbstractKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/AbstractKeyValueStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/AbstractKeyValueStoreTest.java
index 209f3c9..d40f308 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/AbstractKeyValueStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/AbstractKeyValueStoreTest.java
@@ -21,14 +21,12 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.fail;
 
-import org.apache.kafka.streams.StreamingConfig;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.junit.Test;
 
 public abstract class AbstractKeyValueStoreTest {
 
-    protected abstract <K, V> KeyValueStore<K, V> 
createKeyValueStore(StreamingConfig config,
-                                                                      
ProcessorContext context,
+    protected abstract <K, V> KeyValueStore<K, V> 
createKeyValueStore(ProcessorContext context,
                                                                       Class<K> 
keyClass, Class<V> valueClass,
                                                                       boolean 
useContextSerdes);
 
@@ -36,7 +34,7 @@ public abstract class AbstractKeyValueStoreTest {
     public void testPutGetRange() {
         // Create the test driver ...
         KeyValueStoreTestDriver<Integer, String> driver = 
KeyValueStoreTestDriver.create();
-        KeyValueStore<Integer, String> store = 
createKeyValueStore(driver.config(), driver.context(), Integer.class, 
String.class, false);
+        KeyValueStore<Integer, String> store = 
createKeyValueStore(driver.context(), Integer.class, String.class, false);
         try {
 
             // Verify that the store reads and writes correctly ...
@@ -102,7 +100,7 @@ public abstract class AbstractKeyValueStoreTest {
     public void testPutGetRangeWithDefaultSerdes() {
         // Create the test driver ...
         KeyValueStoreTestDriver<Integer, String> driver = 
KeyValueStoreTestDriver.create(Integer.class, String.class);
-        KeyValueStore<Integer, String> store = 
createKeyValueStore(driver.config(), driver.context(), Integer.class, 
String.class, true);
+        KeyValueStore<Integer, String> store = 
createKeyValueStore(driver.context(), Integer.class, String.class, true);
         try {
 
             // Verify that the store reads and writes correctly ...
@@ -152,7 +150,7 @@ public abstract class AbstractKeyValueStoreTest {
 
         // Create the store, which should register with the context and 
automatically
         // receive the restore entries ...
-        KeyValueStore<Integer, String> store = 
createKeyValueStore(driver.config(), driver.context(), Integer.class, 
String.class, false);
+        KeyValueStore<Integer, String> store = 
createKeyValueStore(driver.context(), Integer.class, String.class, false);
         try {
             // Verify that the store's contents were properly restored ...
             assertEquals(0, driver.checkForRestoredEntries(store));
@@ -178,7 +176,7 @@ public abstract class AbstractKeyValueStoreTest {
 
         // Create the store, which should register with the context and 
automatically
         // receive the restore entries ...
-        KeyValueStore<Integer, String> store = 
createKeyValueStore(driver.config(), driver.context(), Integer.class, 
String.class, true);
+        KeyValueStore<Integer, String> store = 
createKeyValueStore(driver.context(), Integer.class, String.class, true);
         try {
             // Verify that the store's contents were properly restored ...
             assertEquals(0, driver.checkForRestoredEntries(store));

http://git-wip-us.apache.org/repos/asf/kafka/blob/5e8958a8/streams/src/test/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreTest.java
index b3fe98c..2b90d0a 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreTest.java
@@ -18,7 +18,6 @@ package org.apache.kafka.streams.state;
 
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.StreamingConfig;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
 
@@ -27,7 +26,6 @@ public class InMemoryKeyValueStoreTest extends 
AbstractKeyValueStoreTest {
     @SuppressWarnings("unchecked")
     @Override
     protected <K, V> KeyValueStore<K, V> createKeyValueStore(
-            StreamingConfig config,
             ProcessorContext context,
             Class<K> keyClass, Class<V> valueClass,
             boolean useContextSerdes) {
@@ -38,9 +36,9 @@ public class InMemoryKeyValueStoreTest extends 
AbstractKeyValueStoreTest {
             Deserializer<K> keyDeser = (Deserializer<K>) 
context.keyDeserializer();
             Serializer<V> valSer = (Serializer<V>) context.valueSerializer();
             Deserializer<V> valDeser = (Deserializer<V>) 
context.valueDeserializer();
-            supplier = Stores.create("my-store", config).withKeys(keySer, 
keyDeser).withValues(valSer, valDeser).inMemory().build();
+            supplier = Stores.create("my-store").withKeys(keySer, 
keyDeser).withValues(valSer, valDeser).inMemory().build();
         } else {
-            supplier = Stores.create("my-store", 
config).withKeys(keyClass).withValues(valueClass).inMemory().build();
+            supplier = 
Stores.create("my-store").withKeys(keyClass).withValues(valueClass).inMemory().build();
         }
 
         KeyValueStore<K, V> store = (KeyValueStore<K, V>) supplier.get();

http://git-wip-us.apache.org/repos/asf/kafka/blob/5e8958a8/streams/src/test/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreTest.java
index dddb9c7..81adfad 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreTest.java
@@ -26,11 +26,12 @@ import org.junit.Test;
 
 public class InMemoryLRUCacheStoreTest {
 
+    @SuppressWarnings("unchecked")
     @Test
     public void testPutGetRange() {
         // Create the test driver ...
         KeyValueStoreTestDriver<Integer, String> driver = 
KeyValueStoreTestDriver.create();
-        StateStoreSupplier supplier = Stores.create("my-store", 
driver.config())
+        StateStoreSupplier supplier = Stores.create("my-store")
                                                      
.withIntegerKeys().withStringValues()
                                                      .inMemory().maxEntries(3)
                                                      .build();
@@ -82,7 +83,7 @@ public class InMemoryLRUCacheStoreTest {
         Deserializer<Integer> keyDeser = (Deserializer<Integer>) 
driver.context().keyDeserializer();
         Serializer<String> valSer = (Serializer<String>) 
driver.context().valueSerializer();
         Deserializer<String> valDeser = (Deserializer<String>) 
driver.context().valueDeserializer();
-        StateStoreSupplier supplier = Stores.create("my-store", 
driver.config())
+        StateStoreSupplier supplier = Stores.create("my-store")
                                                      .withKeys(keySer, 
keyDeser)
                                                      .withValues(valSer, 
valDeser)
                                                      .inMemory().maxEntries(3)
@@ -138,7 +139,7 @@ public class InMemoryLRUCacheStoreTest {
 
         // Create the store, which should register with the context and 
automatically
         // receive the restore entries ...
-        StateStoreSupplier supplier = Stores.create("my-store", 
driver.config())
+        StateStoreSupplier supplier = Stores.create("my-store")
                                                      
.withIntegerKeys().withStringValues()
                                                      .inMemory().maxEntries(3)
                                                      .build();

http://git-wip-us.apache.org/repos/asf/kafka/blob/5e8958a8/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
index 8bab1c9..28cc3af 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
@@ -362,15 +362,6 @@ public class KeyValueStoreTestDriver<K, V> {
     }
 
     /**
-     * Get the streaming config that should be supplied to a {@link Serdes}'s 
constructor.
-     *
-     * @return the streaming config; never null
-     */
-    public StreamingConfig config() {
-        return config;
-    }
-
-    /**
      * Get the context that should be supplied to a {@link KeyValueStore}'s 
constructor. This context records any messages
      * written by the store to the Kafka topic, making them available via the 
{@link #flushedEntryStored(Object)} and
      * {@link #flushedEntryRemoved(Object)} methods.

http://git-wip-us.apache.org/repos/asf/kafka/blob/5e8958a8/streams/src/test/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreTest.java
index 37a12f9..20e92ef 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreTest.java
@@ -18,7 +18,6 @@ package org.apache.kafka.streams.state;
 
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.StreamingConfig;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
 
@@ -27,7 +26,6 @@ public class RocksDBKeyValueStoreTest extends 
AbstractKeyValueStoreTest {
     @SuppressWarnings("unchecked")
     @Override
     protected <K, V> KeyValueStore<K, V> createKeyValueStore(
-            StreamingConfig config,
             ProcessorContext context,
             Class<K> keyClass,
             Class<V> valueClass,
@@ -39,9 +37,9 @@ public class RocksDBKeyValueStoreTest extends 
AbstractKeyValueStoreTest {
             Deserializer<K> keyDeser = (Deserializer<K>) 
context.keyDeserializer();
             Serializer<V> valSer = (Serializer<V>) context.valueSerializer();
             Deserializer<V> valDeser = (Deserializer<V>) 
context.valueDeserializer();
-            supplier = Stores.create("my-store", config).withKeys(keySer, 
keyDeser).withValues(valSer, valDeser).localDatabase().build();
+            supplier = Stores.create("my-store").withKeys(keySer, 
keyDeser).withValues(valSer, valDeser).localDatabase().build();
         } else {
-            supplier = Stores.create("my-store", 
config).withKeys(keyClass).withValues(valueClass).localDatabase().build();
+            supplier = 
Stores.create("my-store").withKeys(keyClass).withValues(valueClass).localDatabase().build();
         }
 
         KeyValueStore<K, V> store = (KeyValueStore<K, V>) supplier.get();

Reply via email to