Repository: kafka Updated Branches: refs/heads/trunk 69a1cced4 -> 5e8958a85
MINOR: initialize Serdes with ProcessorContext guozhangwang Author: Yasuhiro Matsuda <[email protected]> Reviewers: Guozhang Wang Closes #589 from ymatsuda/init_serdes_with_procctx Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5e8958a8 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5e8958a8 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5e8958a8 Branch: refs/heads/trunk Commit: 5e8958a856a5b4ccbdcb610473cab4c2eeddbac5 Parents: 69a1cce Author: Yasuhiro Matsuda <[email protected]> Authored: Wed Nov 25 15:21:17 2015 -0800 Committer: Guozhang Wang <[email protected]> Committed: Wed Nov 25 15:21:17 2015 -0800 ---------------------------------------------------------------------- .../kafka/streams/examples/ProcessorJob.java | 2 +- .../streams/state/MeteredKeyValueStore.java | 1 + .../state/RocksDBKeyValueStoreSupplier.java | 2 + .../org/apache/kafka/streams/state/Serdes.java | 61 +++++++------------- .../org/apache/kafka/streams/state/Stores.java | 5 +- .../internals/ProcessorTopologyTest.java | 2 +- .../state/AbstractKeyValueStoreTest.java | 12 ++-- .../state/InMemoryKeyValueStoreTest.java | 6 +- .../state/InMemoryLRUCacheStoreTest.java | 7 ++- .../streams/state/KeyValueStoreTestDriver.java | 9 --- .../streams/state/RocksDBKeyValueStoreTest.java | 6 +- 11 files changed, 40 insertions(+), 73 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/5e8958a8/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java b/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java index 3274aae..882c7ed 100644 --- a/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java +++ b/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java @@ -104,7 +104,7 @@ public class ProcessorJob { builder.addSource("SOURCE", new StringDeserializer(), new StringDeserializer(), "topic-source"); builder.addProcessor("PROCESS", new MyProcessorSupplier(), "SOURCE"); - builder.addStateStore(Stores.create("local-state", config).withStringKeys().withIntegerValues().inMemory().build()); + builder.addStateStore(Stores.create("local-state").withStringKeys().withIntegerValues().inMemory().build()); builder.connectProcessorAndStateStores("local-state", "PROCESS"); builder.addSink("SINK", "topic-sink", new StringSerializer(), new IntegerSerializer(), "PROCESS"); http://git-wip-us.apache.org/repos/asf/kafka/blob/5e8958a8/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java index c1ccbd4..b68f763 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java @@ -91,6 +91,7 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> { this.flushTime = this.metrics.addLatencySensor(metricGrp, name, "flush", "store-name", name); this.restoreTime = this.metrics.addLatencySensor(metricGrp, name, "restore", "store-name", name); + serialization.init(context); this.context = context; this.partition = context.id().partition; http://git-wip-us.apache.org/repos/asf/kafka/blob/5e8958a8/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreSupplier.java index fe8f00a..f1fbd9f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreSupplier.java @@ -118,6 +118,8 @@ public class RocksDBKeyValueStoreSupplier<K, V> implements StateStoreSupplier { } public void init(ProcessorContext context) { + serdes.init(context); + this.context = context; this.partition = context.id().partition; this.dbName = this.topic + "." + this.partition; http://git-wip-us.apache.org/repos/asf/kafka/blob/5e8958a8/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java b/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java index 31bd439..f41d928 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java @@ -26,7 +26,7 @@ import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.streams.StreamingConfig; +import org.apache.kafka.streams.processor.ProcessorContext; final class Serdes<K, V> { @@ -57,19 +57,21 @@ final class Serdes<K, V> { } private final String topic; - private final Serializer<K> keySerializer; - private final Serializer<V> valueSerializer; - private final Deserializer<K> keyDeserializer; - private final Deserializer<V> valueDeserializer; + private Serializer<K> keySerializer; + private Serializer<V> valueSerializer; + private Deserializer<K> keyDeserializer; + private Deserializer<V> valueDeserializer; /** - * Create a context for serialization using the specified serializers and deserializers. + * Create a context for serialization using the specified serializers and deserializers, or if any of them are null the + * corresponding {@link ProcessorContext}'s serializer or deserializer, which + * <em>must</em> match the key and value types used as parameters for this object. * * @param topic the name of the topic - * @param keySerializer the serializer for keys; may not be null - * @param keyDeserializer the deserializer for keys; may not be null - * @param valueSerializer the serializer for values; may not be null - * @param valueDeserializer the deserializer for values; may not be null + * @param keySerializer the serializer for keys; may be null + * @param keyDeserializer the deserializer for keys; may be null + * @param valueSerializer the serializer for values; may be null + * @param valueDeserializer the deserializer for values; may be null */ public Serdes(String topic, Serializer<K> keySerializer, Deserializer<K> keyDeserializer, @@ -82,45 +84,22 @@ final class Serdes<K, V> { } /** - * Create a context for serialization using the specified serializers and deserializers, or if any of them are null the - * corresponding {@link StreamingConfig}'s serializer or deserializer, which + * Create a context for serialization using the {@link ProcessorContext}'s serializers and deserializers, which * <em>must</em> match the key and value types used as parameters for this object. * * @param topic the name of the topic - * @param keySerializer the serializer for keys; may be null if the {@link StreamingConfig#keySerializer() default - * key serializer} should be used - * @param keyDeserializer the deserializer for keys; may be null if the {@link StreamingConfig#keyDeserializer() default - * key deserializer} should be used - * @param valueSerializer the serializer for values; may be null if the {@link StreamingConfig#valueSerializer() default - * value serializer} should be used - * @param valueDeserializer the deserializer for values; may be null if the {@link StreamingConfig#valueDeserializer() - * default value deserializer} should be used - * @param config the streaming config */ @SuppressWarnings("unchecked") - public Serdes(String topic, - Serializer<K> keySerializer, Deserializer<K> keyDeserializer, - Serializer<V> valueSerializer, Deserializer<V> valueDeserializer, - StreamingConfig config) { - this.topic = topic; - - this.keySerializer = keySerializer != null ? keySerializer : config.keySerializer(); - this.keyDeserializer = keyDeserializer != null ? keyDeserializer : config.keyDeserializer(); - this.valueSerializer = valueSerializer != null ? valueSerializer : config.valueSerializer(); - this.valueDeserializer = valueDeserializer != null ? valueDeserializer : config.valueDeserializer(); + public Serdes(String topic) { + this(topic, null, null, null, null); } - /** - * Create a context for serialization using the {@link StreamingConfig}'s serializers and deserializers, which - * <em>must</em> match the key and value types used as parameters for this object. - * - * @param topic the name of the topic - * @param config the streaming config - */ @SuppressWarnings("unchecked") - public Serdes(String topic, - StreamingConfig config) { - this(topic, null, null, null, null, config); + public void init(ProcessorContext context) { + keySerializer = keySerializer != null ? keySerializer : (Serializer<K>) context.keySerializer(); + keyDeserializer = keyDeserializer != null ? keyDeserializer : (Deserializer<K>) context.keyDeserializer(); + valueSerializer = valueSerializer != null ? valueSerializer : (Serializer<V>) context.valueSerializer(); + valueDeserializer = valueDeserializer != null ? valueDeserializer : (Deserializer<V>) context.valueDeserializer(); } public Deserializer<K> keyDeserializer() { http://git-wip-us.apache.org/repos/asf/kafka/blob/5e8958a8/streams/src/main/java/org/apache/kafka/streams/state/Stores.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java index c5f040f..5452040 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java @@ -26,7 +26,6 @@ import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.streams.StreamingConfig; import org.apache.kafka.streams.processor.StateStoreSupplier; /** @@ -40,7 +39,7 @@ public class Stores { * @param name the name of the store * @return the factory that can be used to specify other options or configurations for the store; never null */ - public static StoreFactory create(final String name, final StreamingConfig config) { + public static StoreFactory create(final String name) { return new StoreFactory() { @Override public <K> ValueFactory<K> withKeys(final Serializer<K> keySerializer, final Deserializer<K> keyDeserializer) { @@ -49,7 +48,7 @@ public class Stores { public <V> KeyValueFactory<K, V> withValues(final Serializer<V> valueSerializer, final Deserializer<V> valueDeserializer) { final Serdes<K, V> serdes = - new Serdes<>(name, keySerializer, keyDeserializer, valueSerializer, valueDeserializer, config); + new Serdes<>(name, keySerializer, keyDeserializer, valueSerializer, valueDeserializer); return new KeyValueFactory<K, V>() { @Override public InMemoryKeyValueFactory<K, V> inMemory() { http://git-wip-us.apache.org/repos/asf/kafka/blob/5e8958a8/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java index 54096b2..2f359bc 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java @@ -204,7 +204,7 @@ public class ProcessorTopologyTest { return new TopologyBuilder().addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC) .addProcessor("processor", define(new StatefulProcessor(storeName)), "source") .addStateStore( - Stores.create(storeName, config).withStringKeys().withStringValues().inMemory().build(), + Stores.create(storeName).withStringKeys().withStringValues().inMemory().build(), "processor" ) .addSink("counts", OUTPUT_TOPIC_1, "processor"); http://git-wip-us.apache.org/repos/asf/kafka/blob/5e8958a8/streams/src/test/java/org/apache/kafka/streams/state/AbstractKeyValueStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/AbstractKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/AbstractKeyValueStoreTest.java index 209f3c9..d40f308 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/AbstractKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/AbstractKeyValueStoreTest.java @@ -21,14 +21,12 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.fail; -import org.apache.kafka.streams.StreamingConfig; import org.apache.kafka.streams.processor.ProcessorContext; import org.junit.Test; public abstract class AbstractKeyValueStoreTest { - protected abstract <K, V> KeyValueStore<K, V> createKeyValueStore(StreamingConfig config, - ProcessorContext context, + protected abstract <K, V> KeyValueStore<K, V> createKeyValueStore(ProcessorContext context, Class<K> keyClass, Class<V> valueClass, boolean useContextSerdes); @@ -36,7 +34,7 @@ public abstract class AbstractKeyValueStoreTest { public void testPutGetRange() { // Create the test driver ... KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create(); - KeyValueStore<Integer, String> store = createKeyValueStore(driver.config(), driver.context(), Integer.class, String.class, false); + KeyValueStore<Integer, String> store = createKeyValueStore(driver.context(), Integer.class, String.class, false); try { // Verify that the store reads and writes correctly ... @@ -102,7 +100,7 @@ public abstract class AbstractKeyValueStoreTest { public void testPutGetRangeWithDefaultSerdes() { // Create the test driver ... KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create(Integer.class, String.class); - KeyValueStore<Integer, String> store = createKeyValueStore(driver.config(), driver.context(), Integer.class, String.class, true); + KeyValueStore<Integer, String> store = createKeyValueStore(driver.context(), Integer.class, String.class, true); try { // Verify that the store reads and writes correctly ... @@ -152,7 +150,7 @@ public abstract class AbstractKeyValueStoreTest { // Create the store, which should register with the context and automatically // receive the restore entries ... - KeyValueStore<Integer, String> store = createKeyValueStore(driver.config(), driver.context(), Integer.class, String.class, false); + KeyValueStore<Integer, String> store = createKeyValueStore(driver.context(), Integer.class, String.class, false); try { // Verify that the store's contents were properly restored ... assertEquals(0, driver.checkForRestoredEntries(store)); @@ -178,7 +176,7 @@ public abstract class AbstractKeyValueStoreTest { // Create the store, which should register with the context and automatically // receive the restore entries ... - KeyValueStore<Integer, String> store = createKeyValueStore(driver.config(), driver.context(), Integer.class, String.class, true); + KeyValueStore<Integer, String> store = createKeyValueStore(driver.context(), Integer.class, String.class, true); try { // Verify that the store's contents were properly restored ... assertEquals(0, driver.checkForRestoredEntries(store)); http://git-wip-us.apache.org/repos/asf/kafka/blob/5e8958a8/streams/src/test/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreTest.java index b3fe98c..2b90d0a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreTest.java @@ -18,7 +18,6 @@ package org.apache.kafka.streams.state; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.streams.StreamingConfig; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStoreSupplier; @@ -27,7 +26,6 @@ public class InMemoryKeyValueStoreTest extends AbstractKeyValueStoreTest { @SuppressWarnings("unchecked") @Override protected <K, V> KeyValueStore<K, V> createKeyValueStore( - StreamingConfig config, ProcessorContext context, Class<K> keyClass, Class<V> valueClass, boolean useContextSerdes) { @@ -38,9 +36,9 @@ public class InMemoryKeyValueStoreTest extends AbstractKeyValueStoreTest { Deserializer<K> keyDeser = (Deserializer<K>) context.keyDeserializer(); Serializer<V> valSer = (Serializer<V>) context.valueSerializer(); Deserializer<V> valDeser = (Deserializer<V>) context.valueDeserializer(); - supplier = Stores.create("my-store", config).withKeys(keySer, keyDeser).withValues(valSer, valDeser).inMemory().build(); + supplier = Stores.create("my-store").withKeys(keySer, keyDeser).withValues(valSer, valDeser).inMemory().build(); } else { - supplier = Stores.create("my-store", config).withKeys(keyClass).withValues(valueClass).inMemory().build(); + supplier = Stores.create("my-store").withKeys(keyClass).withValues(valueClass).inMemory().build(); } KeyValueStore<K, V> store = (KeyValueStore<K, V>) supplier.get(); http://git-wip-us.apache.org/repos/asf/kafka/blob/5e8958a8/streams/src/test/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreTest.java index dddb9c7..81adfad 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreTest.java @@ -26,11 +26,12 @@ import org.junit.Test; public class InMemoryLRUCacheStoreTest { + @SuppressWarnings("unchecked") @Test public void testPutGetRange() { // Create the test driver ... KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create(); - StateStoreSupplier supplier = Stores.create("my-store", driver.config()) + StateStoreSupplier supplier = Stores.create("my-store") .withIntegerKeys().withStringValues() .inMemory().maxEntries(3) .build(); @@ -82,7 +83,7 @@ public class InMemoryLRUCacheStoreTest { Deserializer<Integer> keyDeser = (Deserializer<Integer>) driver.context().keyDeserializer(); Serializer<String> valSer = (Serializer<String>) driver.context().valueSerializer(); Deserializer<String> valDeser = (Deserializer<String>) driver.context().valueDeserializer(); - StateStoreSupplier supplier = Stores.create("my-store", driver.config()) + StateStoreSupplier supplier = Stores.create("my-store") .withKeys(keySer, keyDeser) .withValues(valSer, valDeser) .inMemory().maxEntries(3) @@ -138,7 +139,7 @@ public class InMemoryLRUCacheStoreTest { // Create the store, which should register with the context and automatically // receive the restore entries ... - StateStoreSupplier supplier = Stores.create("my-store", driver.config()) + StateStoreSupplier supplier = Stores.create("my-store") .withIntegerKeys().withStringValues() .inMemory().maxEntries(3) .build(); http://git-wip-us.apache.org/repos/asf/kafka/blob/5e8958a8/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java index 8bab1c9..28cc3af 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java @@ -362,15 +362,6 @@ public class KeyValueStoreTestDriver<K, V> { } /** - * Get the streaming config that should be supplied to a {@link Serdes}'s constructor. - * - * @return the streaming config; never null - */ - public StreamingConfig config() { - return config; - } - - /** * Get the context that should be supplied to a {@link KeyValueStore}'s constructor. This context records any messages * written by the store to the Kafka topic, making them available via the {@link #flushedEntryStored(Object)} and * {@link #flushedEntryRemoved(Object)} methods. http://git-wip-us.apache.org/repos/asf/kafka/blob/5e8958a8/streams/src/test/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreTest.java index 37a12f9..20e92ef 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreTest.java @@ -18,7 +18,6 @@ package org.apache.kafka.streams.state; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.streams.StreamingConfig; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStoreSupplier; @@ -27,7 +26,6 @@ public class RocksDBKeyValueStoreTest extends AbstractKeyValueStoreTest { @SuppressWarnings("unchecked") @Override protected <K, V> KeyValueStore<K, V> createKeyValueStore( - StreamingConfig config, ProcessorContext context, Class<K> keyClass, Class<V> valueClass, @@ -39,9 +37,9 @@ public class RocksDBKeyValueStoreTest extends AbstractKeyValueStoreTest { Deserializer<K> keyDeser = (Deserializer<K>) context.keyDeserializer(); Serializer<V> valSer = (Serializer<V>) context.valueSerializer(); Deserializer<V> valDeser = (Deserializer<V>) context.valueDeserializer(); - supplier = Stores.create("my-store", config).withKeys(keySer, keyDeser).withValues(valSer, valDeser).localDatabase().build(); + supplier = Stores.create("my-store").withKeys(keySer, keyDeser).withValues(valSer, valDeser).localDatabase().build(); } else { - supplier = Stores.create("my-store", config).withKeys(keyClass).withValues(valueClass).localDatabase().build(); + supplier = Stores.create("my-store").withKeys(keyClass).withValues(valueClass).localDatabase().build(); } KeyValueStore<K, V> store = (KeyValueStore<K, V>) supplier.get();
