Repository: kafka Updated Branches: refs/heads/trunk f57dabbe5 -> dea0719e9
http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java index a5990bd..be851bf 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java @@ -36,6 +36,7 @@ import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo; import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo; import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.MockStateStoreSupplier; +import org.apache.kafka.test.MockTimestampExtractor; import org.junit.Test; import java.util.ArrayList; @@ -89,14 +90,10 @@ public class StreamPartitionAssignorTest { private Properties configProps() { return new Properties() { { - setProperty(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); - setProperty(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); - setProperty(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); - setProperty(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); - setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.test.MockTimestampExtractor"); setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "stream-partition-assignor-test"); setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171"); setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3"); + setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class.getName()); } }; } http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index f2ade6b..33fa5c4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -33,6 +33,7 @@ import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.processor.StateStoreSupplier; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.test.MockSourceNode; +import org.apache.kafka.test.MockTimestampExtractor; import org.junit.Test; import org.junit.Before; @@ -73,15 +74,11 @@ public class StreamTaskTest { private StreamsConfig createConfig(final File baseDir) throws Exception { return new StreamsConfig(new Properties() { { - setProperty(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); - setProperty(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); - setProperty(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); - setProperty(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); - setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.test.MockTimestampExtractor"); setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "stream-task-test"); setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171"); setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3"); setProperty(StreamsConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath()); + setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class.getName()); } }); } http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index b201c07..e387a59 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -41,6 +41,7 @@ import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.TopologyBuilder; import org.apache.kafka.test.MockProcessorSupplier; +import org.apache.kafka.test.MockTimestampExtractor; import org.junit.Test; import java.io.File; @@ -113,14 +114,10 @@ public class StreamThreadTest { private Properties configProps() { return new Properties() { { - setProperty(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); - setProperty(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); - setProperty(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); - setProperty(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); - setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.test.MockTimestampExtractor"); setProperty(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171"); setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3"); + setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class.getName()); } }; } http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java index 063eafe..ce4956c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java +++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java @@ -18,8 +18,6 @@ package org.apache.kafka.streams.smoketest; import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.common.serialization.ByteArrayDeserializer; -import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsConfig; @@ -85,11 +83,7 @@ public class SmokeTestClient extends SmokeTestUtil { props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.toString()); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeper); - props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TestTimestampExtractor.class); - props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); - props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); - props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); - props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); + props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TestTimestampExtractor.class.getName()); props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3); props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 2); props.put(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, 100); @@ -98,9 +92,9 @@ public class SmokeTestClient extends SmokeTestUtil { KStreamBuilder builder = new KStreamBuilder(); - KStream<String, Integer> source = builder.stream(stringDeserializer, integerDeserializer, "data"); + KStream<String, Integer> source = builder.stream(stringSerde, intSerde, "data"); - source.to("echo", stringSerializer, integerSerializer); + source.to("echo", stringSerde, intSerde); KStream<String, Integer> data = source.filter(new Predicate<String, Integer>() { @Override @@ -125,15 +119,13 @@ public class SmokeTestClient extends SmokeTestUtil { } }, UnlimitedWindows.of("uwin-min"), - stringSerializer, - integerSerializer, - stringDeserializer, - integerDeserializer + stringSerde, + intSerde ).toStream().map( new Unwindow<String, Integer>() - ).to("min", stringSerializer, integerSerializer); + ).to("min", stringSerde, intSerde); - KTable<String, Integer> minTable = builder.table(stringSerializer, integerSerializer, stringDeserializer, integerDeserializer, "min"); + KTable<String, Integer> minTable = builder.table(stringSerde, intSerde, "min"); minTable.toStream().process(SmokeTestUtil.<Integer>printProcessorSupplier("min")); // max @@ -150,15 +142,13 @@ public class SmokeTestClient extends SmokeTestUtil { } }, UnlimitedWindows.of("uwin-max"), - stringSerializer, - integerSerializer, - stringDeserializer, - integerDeserializer + stringSerde, + intSerde ).toStream().map( new Unwindow<String, Integer>() - ).to("max", stringSerializer, integerSerializer); + ).to("max", stringSerde, intSerde); - KTable<String, Integer> maxTable = builder.table(stringSerializer, integerSerializer, stringDeserializer, integerDeserializer, "max"); + KTable<String, Integer> maxTable = builder.table(stringSerde, intSerde, "max"); maxTable.toStream().process(SmokeTestUtil.<Integer>printProcessorSupplier("max")); // sum @@ -175,28 +165,25 @@ public class SmokeTestClient extends SmokeTestUtil { } }, UnlimitedWindows.of("win-sum"), - stringSerializer, - longSerializer, - stringDeserializer, - longDeserializer + stringSerde, + longSerde ).toStream().map( new Unwindow<String, Long>() - ).to("sum", stringSerializer, longSerializer); + ).to("sum", stringSerde, longSerde); - KTable<String, Long> sumTable = builder.table(stringSerializer, longSerializer, stringDeserializer, longDeserializer, "sum"); + KTable<String, Long> sumTable = builder.table(stringSerde, longSerde, "sum"); sumTable.toStream().process(SmokeTestUtil.<Long>printProcessorSupplier("sum")); // cnt data.countByKey( UnlimitedWindows.of("uwin-cnt"), - stringSerializer, - stringDeserializer + stringSerde ).toStream().map( new Unwindow<String, Long>() - ).to("cnt", stringSerializer, longSerializer); + ).to("cnt", stringSerde, longSerde); - KTable<String, Long> cntTable = builder.table(stringSerializer, longSerializer, stringDeserializer, longDeserializer, "cnt"); + KTable<String, Long> cntTable = builder.table(stringSerde, longSerde, "cnt"); cntTable.toStream().process(SmokeTestUtil.<Long>printProcessorSupplier("cnt")); // dif @@ -206,7 +193,7 @@ public class SmokeTestClient extends SmokeTestUtil { return value1 - value2; } } - ).to("dif", stringSerializer, integerSerializer); + ).to("dif", stringSerde, intSerde); // avg sumTable.join( @@ -216,13 +203,12 @@ public class SmokeTestClient extends SmokeTestUtil { return (double) value1 / (double) value2; } } - ).to("avg", stringSerializer, doubleSerializer); + ).to("avg", stringSerde, doubleSerde); // windowed count data.countByKey( TumblingWindows.of("tumbling-win-cnt").with(WINDOW_SIZE), - stringSerializer, - stringDeserializer + stringSerde ).toStream().map( new KeyValueMapper<Windowed<String>, Long, KeyValue<String, Long>>() { @Override @@ -230,7 +216,7 @@ public class SmokeTestClient extends SmokeTestUtil { return new KeyValue<>(key.value() + "@" + key.window().start(), value); } } - ).to("wcnt", stringSerializer, longSerializer); + ).to("wcnt", stringSerde, longSerde); // test repartition Agg agg = new Agg(); @@ -239,14 +225,11 @@ public class SmokeTestClient extends SmokeTestUtil { agg.adder(), agg.remover(), agg.selector(), - stringSerializer, - longSerializer, - longSerializer, - stringDeserializer, - longDeserializer, - longDeserializer, + stringSerde, + longSerde, + longSerde, "cntByCnt" - ).to("tagg", stringSerializer, longSerializer); + ).to("tagg", stringSerde, longSerde); return new KafkaStreams(builder, props); } http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestDriver.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestDriver.java index c0a6f46..1abf88d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestDriver.java @@ -157,7 +157,7 @@ public class SmokeTestDriver extends SmokeTestUtil { } ProducerRecord<byte[], byte[]> record = - new ProducerRecord<>("data", stringSerializer.serialize("", key), integerSerializer.serialize("", value)); + new ProducerRecord<>("data", stringSerde.serializer().serialize("", key), intSerde.serializer().serialize("", value)); producer.send(record); @@ -233,10 +233,10 @@ public class SmokeTestDriver extends SmokeTestUtil { retryCount = 0; for (ConsumerRecord<byte[], byte[]> record : records) { - String key = stringDeserializer.deserialize("", record.key()); + String key = stringSerde.deserializer().deserialize("", record.key()); switch (record.topic()) { case "echo": - Integer value = integerDeserializer.deserialize("", record.value()); + Integer value = intSerde.deserializer().deserialize("", record.value()); if (value != null && value == END) { keys.remove(key); if (keys.isEmpty()) { @@ -249,28 +249,28 @@ public class SmokeTestDriver extends SmokeTestUtil { } break; case "min": - min.put(key, integerDeserializer.deserialize("", record.value())); + min.put(key, intSerde.deserializer().deserialize("", record.value())); break; case "max": - max.put(key, integerDeserializer.deserialize("", record.value())); + max.put(key, intSerde.deserializer().deserialize("", record.value())); break; case "dif": - dif.put(key, integerDeserializer.deserialize("", record.value())); + dif.put(key, intSerde.deserializer().deserialize("", record.value())); break; case "sum": - sum.put(key, longDeserializer.deserialize("", record.value())); + sum.put(key, longSerde.deserializer().deserialize("", record.value())); break; case "cnt": - cnt.put(key, longDeserializer.deserialize("", record.value())); + cnt.put(key, longSerde.deserializer().deserialize("", record.value())); break; case "avg": - avg.put(key, doubleDeserializer.deserialize("", record.value())); + avg.put(key, doubleSerde.deserializer().deserialize("", record.value())); break; case "wcnt": - wcnt.put(key, longDeserializer.deserialize("", record.value())); + wcnt.put(key, longSerde.deserializer().deserialize("", record.value())); break; case "tagg": - tagg.put(key, longDeserializer.deserialize("", record.value())); + tagg.put(key, longSerde.deserializer().deserialize("", record.value())); break; default: System.out.println("unknown topic: " + record.topic()); http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java index 3f5503f..c5ded5e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java +++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java @@ -17,15 +17,8 @@ package org.apache.kafka.streams.smoketest; -import org.apache.kafka.common.errors.SerializationException; -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.IntegerDeserializer; -import org.apache.kafka.common.serialization.IntegerSerializer; -import org.apache.kafka.common.serialization.LongDeserializer; -import org.apache.kafka.common.serialization.LongSerializer; -import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.Aggregator; import org.apache.kafka.streams.kstream.Initializer; @@ -36,7 +29,6 @@ import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.ProcessorSupplier; import java.io.File; -import java.util.Map; public class SmokeTestUtil { @@ -128,74 +120,13 @@ public class SmokeTestUtil { } } - public static Serializer<String> stringSerializer = new StringSerializer(); + public static Serde<String> stringSerde = Serdes.String(); - public static Deserializer<String> stringDeserializer = new StringDeserializer(); + public static Serde<Integer> intSerde = Serdes.Integer(); - public static Serializer<Integer> integerSerializer = new IntegerSerializer(); + public static Serde<Long> longSerde = Serdes.Long(); - public static Deserializer<Integer> integerDeserializer = new IntegerDeserializer(); - - public static Serializer<Long> longSerializer = new LongSerializer(); - - public static Deserializer<Long> longDeserializer = new LongDeserializer(); - - public static Serializer<Double> doubleSerializer = new Serializer<Double>() { - - @Override - public void configure(Map<String, ?> configs, boolean isKey) { - } - - @Override - public byte[] serialize(String topic, Double data) { - if (data == null) - return null; - - long bits = Double.doubleToLongBits(data); - return new byte[] { - (byte) (bits >>> 56), - (byte) (bits >>> 48), - (byte) (bits >>> 40), - (byte) (bits >>> 32), - (byte) (bits >>> 24), - (byte) (bits >>> 16), - (byte) (bits >>> 8), - (byte) bits - }; - } - - @Override - public void close() { - } - }; - - public static Deserializer<Double> doubleDeserializer = new Deserializer<Double>() { - - @Override - public void configure(Map<String, ?> configs, boolean isKey) { - } - - @Override - public Double deserialize(String topic, byte[] data) { - if (data == null) - return null; - if (data.length != 8) { - throw new SerializationException("Size of data received by Deserializer is " + - "not 8"); - } - - long value = 0; - for (byte b : data) { - value <<= 8; - value |= b & 0xFF; - } - return Double.longBitsToDouble(value); - } - - @Override - public void close() { - } - }; + public static Serde<Double> doubleSerde = Serdes.Double(); public static File createDir(String path) throws Exception { File dir = new File(path); http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java index d8b034f..0468f49 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java @@ -22,6 +22,7 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsConfig; @@ -129,58 +130,6 @@ import java.util.Set; */ public class KeyValueStoreTestDriver<K, V> { - private static <T> Serializer<T> unusableSerializer() { - return new Serializer<T>() { - @Override - public void configure(Map<String, ?> configs, boolean isKey) { - } - - @Override - public byte[] serialize(String topic, T data) { - throw new UnsupportedOperationException("This serializer should not be used"); - } - - @Override - public void close() { - } - }; - }; - - private static <T> Deserializer<T> unusableDeserializer() { - return new Deserializer<T>() { - @Override - public void configure(Map<String, ?> configs, boolean isKey) { - } - - @Override - public T deserialize(String topic, byte[] data) { - throw new UnsupportedOperationException("This deserializer should not be used"); - } - - @Override - public void close() { - } - }; - }; - - /** - * Create a driver object that will have a {@link #context()} that records messages - * {@link ProcessorContext#forward(Object, Object) forwarded} by the store and that provides <em>unusable</em> default key and - * value serializers and deserializers. This can be used when the actual serializers and deserializers are supplied to the - * store during creation, which should eliminate the need for a store to depend on the ProcessorContext's default key and - * value serializers and deserializers. - * - * @return the test driver; never null - */ - public static <K, V> KeyValueStoreTestDriver<K, V> create() { - Serializer<K> keySerializer = unusableSerializer(); - Deserializer<K> keyDeserializer = unusableDeserializer(); - Serializer<V> valueSerializer = unusableSerializer(); - Deserializer<V> valueDeserializer = unusableDeserializer(); - Serdes<K, V> serdes = new Serdes<K, V>("unexpected", keySerializer, keyDeserializer, valueSerializer, valueDeserializer); - return new KeyValueStoreTestDriver<K, V>(serdes); - } - /** * Create a driver object that will have a {@link #context()} that records messages * {@link ProcessorContext#forward(Object, Object) forwarded} by the store and that provides default serializers and @@ -195,7 +144,7 @@ public class KeyValueStoreTestDriver<K, V> { * @return the test driver; never null */ public static <K, V> KeyValueStoreTestDriver<K, V> create(Class<K> keyClass, Class<V> valueClass) { - Serdes<K, V> serdes = Serdes.withBuiltinTypes("unexpected", keyClass, valueClass); + StateSerdes<K, V> serdes = StateSerdes.withBuiltinTypes("unexpected", keyClass, valueClass); return new KeyValueStoreTestDriver<K, V>(serdes); } @@ -215,7 +164,9 @@ public class KeyValueStoreTestDriver<K, V> { Deserializer<K> keyDeserializer, Serializer<V> valueSerializer, Deserializer<V> valueDeserializer) { - Serdes<K, V> serdes = new Serdes<K, V>("unexpected", keySerializer, keyDeserializer, valueSerializer, valueDeserializer); + StateSerdes<K, V> serdes = new StateSerdes<K, V>("unexpected", + Serdes.serdeFrom(keySerializer, keyDeserializer), + Serdes.serdeFrom(valueSerializer, valueDeserializer)); return new KeyValueStoreTestDriver<K, V>(serdes); } @@ -237,7 +188,7 @@ public class KeyValueStoreTestDriver<K, V> { private final RecordCollector recordCollector; private File stateDir = null; - protected KeyValueStoreTestDriver(final Serdes<K, V> serdes) { + protected KeyValueStoreTestDriver(final StateSerdes<K, V> serdes) { ByteArraySerializer rawSerializer = new ByteArraySerializer(); Producer<byte[], byte[]> producer = new MockProducer<>(true, rawSerializer, rawSerializer); @@ -276,13 +227,10 @@ public class KeyValueStoreTestDriver<K, V> { Properties props = new Properties(); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class); - props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, serdes.keySerializer().getClass()); - props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, serdes.keyDeserializer().getClass()); - props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, serdes.valueSerializer().getClass()); - props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, serdes.valueDeserializer().getClass()); + props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, serdes.keySerde().getClass()); + props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, serdes.valueSerde().getClass()); - this.context = new MockProcessorContext(null, this.stateDir, serdes.keySerializer(), serdes.keyDeserializer(), serdes.valueSerializer(), - serdes.valueDeserializer(), recordCollector) { + this.context = new MockProcessorContext(null, this.stateDir, serdes.keySerde(), serdes.valueSerde(), recordCollector) { @Override public TaskId taskId() { return new TaskId(0, 1); @@ -328,7 +276,7 @@ public class KeyValueStoreTestDriver<K, V> { } } - private void restoreEntries(StateRestoreCallback func, Serdes<K, V> serdes) { + private void restoreEntries(StateRestoreCallback func, StateSerdes<K, V> serdes) { for (KeyValue<K, V> entry : restorableEntries) { if (entry != null) { byte[] rawKey = serdes.rawKey(entry.key); http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java index 46948bd..b44583d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java @@ -16,8 +16,6 @@ */ package org.apache.kafka.streams.state.internals; -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStoreSupplier; import org.apache.kafka.streams.state.KeyValueStore; @@ -34,11 +32,7 @@ public class InMemoryKeyValueStoreTest extends AbstractKeyValueStoreTest { StateStoreSupplier supplier; if (useContextSerdes) { - Serializer<K> keySer = (Serializer<K>) context.keySerializer(); - Deserializer<K> keyDeser = (Deserializer<K>) context.keyDeserializer(); - Serializer<V> valSer = (Serializer<V>) context.valueSerializer(); - Deserializer<V> valDeser = (Deserializer<V>) context.valueDeserializer(); - supplier = Stores.create("my-store").withKeys(keySer, keyDeser).withValues(valSer, valDeser).inMemory().build(); + supplier = Stores.create("my-store").withKeys(context.keySerde()).withValues(context.valueSerde()).inMemory().build(); } else { supplier = Stores.create("my-store").withKeys(keyClass).withValues(valueClass).inMemory().build(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java index a2b79e5..c301223 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java @@ -16,8 +16,6 @@ */ package org.apache.kafka.streams.state.internals; -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStoreSupplier; import org.apache.kafka.streams.state.KeyValueStore; @@ -40,11 +38,7 @@ public class InMemoryLRUCacheStoreTest extends AbstractKeyValueStoreTest { StateStoreSupplier supplier; if (useContextSerdes) { - Serializer<K> keySer = (Serializer<K>) context.keySerializer(); - Deserializer<K> keyDeser = (Deserializer<K>) context.keyDeserializer(); - Serializer<V> valSer = (Serializer<V>) context.valueSerializer(); - Deserializer<V> valDeser = (Deserializer<V>) context.valueDeserializer(); - supplier = Stores.create("my-store").withKeys(keySer, keyDeser).withValues(valSer, valDeser).inMemory().maxEntries(10).build(); + supplier = Stores.create("my-store").withKeys(context.keySerde()).withValues(context.valueSerde()).inMemory().maxEntries(10).build(); } else { supplier = Stores.create("my-store").withKeys(keyClass).withValues(valueClass).inMemory().maxEntries(10).build(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java index 8e8f69c..280255a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java @@ -16,8 +16,6 @@ */ package org.apache.kafka.streams.state.internals; -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStoreSupplier; import org.apache.kafka.streams.state.KeyValueStore; @@ -35,11 +33,7 @@ public class RocksDBKeyValueStoreTest extends AbstractKeyValueStoreTest { StateStoreSupplier supplier; if (useContextSerdes) { - Serializer<K> keySer = (Serializer<K>) context.keySerializer(); - Deserializer<K> keyDeser = (Deserializer<K>) context.keyDeserializer(); - Serializer<V> valSer = (Serializer<V>) context.valueSerializer(); - Deserializer<V> valDeser = (Deserializer<V>) context.valueDeserializer(); - supplier = Stores.create("my-store").withKeys(keySer, keyDeser).withValues(valSer, valDeser).persistent().build(); + supplier = Stores.create("my-store").withKeys(context.keySerde()).withValues(context.valueSerde()).persistent().build(); } else { supplier = Stores.create("my-store").withKeys(keyClass).withValues(valueClass).persistent().build(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java index 5a196ec..ffc97c3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java @@ -20,15 +20,15 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.clients.producer.MockProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.serialization.ByteArrayDeserializer; -import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStoreSupplier; import org.apache.kafka.streams.processor.internals.RecordCollector; -import org.apache.kafka.streams.state.Serdes; +import org.apache.kafka.streams.state.StateSerdes; import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.streams.state.WindowStoreIterator; import org.apache.kafka.streams.state.WindowStoreUtils; @@ -51,17 +51,16 @@ import static org.junit.Assert.assertNull; public class RocksDBWindowStoreTest { - private final ByteArraySerializer byteArraySerializer = new ByteArraySerializer(); - private final ByteArrayDeserializer byteArrayDeserializer = new ByteArrayDeserializer(); + private final Serde<byte[]> byteArraySerde = Serdes.ByteArray(); private final String windowName = "window"; private final int numSegments = 3; private final long segmentSize = RocksDBWindowStore.MIN_SEGMENT_INTERVAL; private final long retentionPeriod = segmentSize * (numSegments - 1); private final long windowSize = 3; - private final Serdes<Integer, String> serdes = Serdes.withBuiltinTypes("", Integer.class, String.class); + private final StateSerdes<Integer, String> serdes = StateSerdes.withBuiltinTypes("", Integer.class, String.class); @SuppressWarnings("unchecked") - protected <K, V> WindowStore<K, V> createWindowStore(ProcessorContext context, Serdes<K, V> serdes) { + protected <K, V> WindowStore<K, V> createWindowStore(ProcessorContext context, StateSerdes<K, V> serdes) { StateStoreSupplier supplier = new RocksDBWindowStoreSupplier<>(windowName, retentionPeriod, numSegments, true, serdes, null); WindowStore<K, V> store = (WindowStore<K, V>) supplier.get(); @@ -74,7 +73,7 @@ public class RocksDBWindowStoreTest { File baseDir = Files.createTempDirectory("test").toFile(); try { final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>(); - Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer); + Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer()); RecordCollector recordCollector = new RecordCollector(producer) { @Override public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) { @@ -87,7 +86,7 @@ public class RocksDBWindowStoreTest { MockProcessorContext context = new MockProcessorContext( null, baseDir, - byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer, + byteArraySerde, byteArraySerde, recordCollector); WindowStore<Integer, String> store = createWindowStore(context, serdes); @@ -170,7 +169,7 @@ public class RocksDBWindowStoreTest { File baseDir = Files.createTempDirectory("test").toFile(); try { final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>(); - Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer); + Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer()); RecordCollector recordCollector = new RecordCollector(producer) { @Override public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) { @@ -183,7 +182,7 @@ public class RocksDBWindowStoreTest { MockProcessorContext context = new MockProcessorContext( null, baseDir, - byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer, + byteArraySerde, byteArraySerde, recordCollector); WindowStore<Integer, String> store = createWindowStore(context, serdes); @@ -266,7 +265,7 @@ public class RocksDBWindowStoreTest { File baseDir = Files.createTempDirectory("test").toFile(); try { final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>(); - Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer); + Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer()); RecordCollector recordCollector = new RecordCollector(producer) { @Override public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) { @@ -279,7 +278,7 @@ public class RocksDBWindowStoreTest { MockProcessorContext context = new MockProcessorContext( null, baseDir, - byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer, + byteArraySerde, byteArraySerde, recordCollector); WindowStore<Integer, String> store = createWindowStore(context, serdes); @@ -362,7 +361,7 @@ public class RocksDBWindowStoreTest { File baseDir = Files.createTempDirectory("test").toFile(); try { final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>(); - Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer); + Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer()); RecordCollector recordCollector = new RecordCollector(producer) { @Override public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) { @@ -375,7 +374,7 @@ public class RocksDBWindowStoreTest { MockProcessorContext context = new MockProcessorContext( null, baseDir, - byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer, + byteArraySerde, byteArraySerde, recordCollector); WindowStore<Integer, String> store = createWindowStore(context, serdes); @@ -421,7 +420,7 @@ public class RocksDBWindowStoreTest { File baseDir = Files.createTempDirectory("test").toFile(); try { final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>(); - Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer); + Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer()); RecordCollector recordCollector = new RecordCollector(producer) { @Override public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) { @@ -434,7 +433,7 @@ public class RocksDBWindowStoreTest { MockProcessorContext context = new MockProcessorContext( null, baseDir, - byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer, + byteArraySerde, byteArraySerde, recordCollector); WindowStore<Integer, String> store = createWindowStore(context, serdes); @@ -538,7 +537,7 @@ public class RocksDBWindowStoreTest { File baseDir = Files.createTempDirectory("test").toFile(); try { - Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer); + Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer()); RecordCollector recordCollector = new RecordCollector(producer) { @Override public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) { @@ -551,7 +550,7 @@ public class RocksDBWindowStoreTest { MockProcessorContext context = new MockProcessorContext( null, baseDir, - byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer, + byteArraySerde, byteArraySerde, recordCollector); WindowStore<Integer, String> store = createWindowStore(context, serdes); @@ -587,7 +586,7 @@ public class RocksDBWindowStoreTest { File baseDir2 = Files.createTempDirectory("test").toFile(); try { - Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer); + Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer()); RecordCollector recordCollector = new RecordCollector(producer) { @Override public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) { @@ -600,7 +599,7 @@ public class RocksDBWindowStoreTest { MockProcessorContext context = new MockProcessorContext( null, baseDir, - byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer, + byteArraySerde, byteArraySerde, recordCollector); WindowStore<Integer, String> store = createWindowStore(context, serdes); @@ -642,7 +641,7 @@ public class RocksDBWindowStoreTest { public void testSegmentMaintenance() throws IOException { File baseDir = Files.createTempDirectory("test").toFile(); try { - Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer); + Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer()); RecordCollector recordCollector = new RecordCollector(producer) { @Override public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) { @@ -652,7 +651,7 @@ public class RocksDBWindowStoreTest { MockProcessorContext context = new MockProcessorContext( null, baseDir, - byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer, + byteArraySerde, byteArraySerde, recordCollector); WindowStore<Integer, String> store = createWindowStore(context, serdes); @@ -745,7 +744,7 @@ public class RocksDBWindowStoreTest { public void testInitialLoading() throws IOException { File baseDir = Files.createTempDirectory("test").toFile(); try { - Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer); + Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer()); RecordCollector recordCollector = new RecordCollector(producer) { @Override public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) { @@ -755,7 +754,7 @@ public class RocksDBWindowStoreTest { MockProcessorContext context = new MockProcessorContext( null, baseDir, - byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer, + byteArraySerde, byteArraySerde, recordCollector); File storeDir = new File(baseDir, windowName); http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java index 5f014ef..9a477df 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java @@ -31,7 +31,7 @@ import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.streams.processor.internals.RecordCollector; -import org.apache.kafka.streams.state.Serdes; +import org.apache.kafka.streams.state.StateSerdes; import org.apache.kafka.test.MockProcessorContext; import org.junit.Test; @@ -44,7 +44,7 @@ public class StoreChangeLoggerTest { private final Map<Integer, String> logged = new HashMap<>(); private final Map<Integer, String> written = new HashMap<>(); - private final ProcessorContext context = new MockProcessorContext(Serdes.withBuiltinTypes(topic, Integer.class, String.class), + private final ProcessorContext context = new MockProcessorContext(StateSerdes.withBuiltinTypes(topic, Integer.class, String.class), new RecordCollector(null) { @SuppressWarnings("unchecked") @Override @@ -61,7 +61,7 @@ public class StoreChangeLoggerTest { } ); - private final StoreChangeLogger<Integer, String> changeLogger = new StoreChangeLogger<>(topic, context, Serdes.withBuiltinTypes(topic, Integer.class, String.class), 3, 3); + private final StoreChangeLogger<Integer, String> changeLogger = new StoreChangeLogger<>(topic, context, StateSerdes.withBuiltinTypes(topic, Integer.class, String.class), 3, 3); private final StoreChangeLogger<byte[], byte[]> rawChangeLogger = new RawStoreChangeLogger(topic, context, 3, 3); http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java index edbcb4a..05713c1 100644 --- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java @@ -18,7 +18,8 @@ package org.apache.kafka.test; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.processor.ProcessorContext; @@ -42,20 +43,20 @@ public class KStreamTestDriver { private ProcessorNode currNode; public KStreamTestDriver(KStreamBuilder builder) { - this(builder, null, null, null, null, null); + this(builder, null, Serdes.ByteArray(), Serdes.ByteArray()); } public KStreamTestDriver(KStreamBuilder builder, File stateDir) { - this(builder, stateDir, null, null, null, null); + this(builder, stateDir, Serdes.ByteArray(), Serdes.ByteArray()); } public KStreamTestDriver(KStreamBuilder builder, File stateDir, - Serializer<?> keySerializer, Deserializer<?> keyDeserializer, - Serializer<?> valSerializer, Deserializer<?> valDeserializer) { + Serde<?> keySerde, + Serde<?> valSerde) { this.topology = builder.build("X", null); this.stateDir = stateDir; - this.context = new MockProcessorContext(this, stateDir, keySerializer, keyDeserializer, valSerializer, valDeserializer, new MockRecordCollector()); + this.context = new MockProcessorContext(this, stateDir, keySerde, valSerde, new MockRecordCollector()); for (StateStoreSupplier stateStoreSupplier : topology.stateStoreSuppliers()) { StateStore store = stateStoreSupplier.get(); http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java index b463669..e57e1c7 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java +++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java @@ -18,16 +18,15 @@ package org.apache.kafka.test; import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsMetrics; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateRestoreCallback; import org.apache.kafka.streams.processor.StateStore; -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.RecordCollector; -import org.apache.kafka.streams.state.Serdes; +import org.apache.kafka.streams.state.StateSerdes; import java.io.File; import java.util.Collections; @@ -38,10 +37,8 @@ import java.util.Map; public class MockProcessorContext implements ProcessorContext, RecordCollector.Supplier { private final KStreamTestDriver driver; - private final Serializer keySerializer; - private final Serializer valueSerializer; - private final Deserializer keyDeserializer; - private final Deserializer valueDeserializer; + private final Serde<?> keySerde; + private final Serde<?> valSerde; private final RecordCollector.Supplier recordCollectorSupplier; private final File stateDir; @@ -50,21 +47,15 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S long timestamp = -1L; - public MockProcessorContext(Serdes<?, ?> serdes, RecordCollector collector) { - this(null, null, serdes.keySerializer(), serdes.keyDeserializer(), serdes.valueSerializer(), serdes.valueDeserializer(), collector); - } - - public MockProcessorContext(Serializer<?> keySerializer, Deserializer<?> keyDeserializer, - Serializer<?> valueSerializer, Deserializer<?> valueDeserializer, - RecordCollector collector) { - this(null, null, keySerializer, keyDeserializer, valueSerializer, valueDeserializer, collector); + public MockProcessorContext(StateSerdes<?, ?> serdes, RecordCollector collector) { + this(null, null, serdes.keySerde(), serdes.valueSerde(), collector); } public MockProcessorContext(KStreamTestDriver driver, File stateDir, - Serializer<?> keySerializer, Deserializer<?> keyDeserializer, - Serializer<?> valueSerializer, Deserializer<?> valueDeserializer, + Serde<?> keySerde, + Serde<?> valSerde, final RecordCollector collector) { - this(driver, stateDir, keySerializer, keyDeserializer, valueSerializer, valueDeserializer, + this(driver, stateDir, keySerde, valSerde, new RecordCollector.Supplier() { @Override public RecordCollector recordCollector() { @@ -74,15 +65,13 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S } public MockProcessorContext(KStreamTestDriver driver, File stateDir, - Serializer<?> keySerializer, Deserializer<?> keyDeserializer, - Serializer<?> valueSerializer, Deserializer<?> valueDeserializer, + Serde<?> keySerde, + Serde<?> valSerde, RecordCollector.Supplier collectorSupplier) { this.driver = driver; this.stateDir = stateDir; - this.keySerializer = keySerializer; - this.valueSerializer = valueSerializer; - this.keyDeserializer = keyDeserializer; - this.valueDeserializer = valueDeserializer; + this.keySerde = keySerde; + this.valSerde = valSerde; this.recordCollectorSupplier = collectorSupplier; } @@ -111,23 +100,13 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S } @Override - public Serializer<?> keySerializer() { - return keySerializer; - } - - @Override - public Serializer<?> valueSerializer() { - return valueSerializer; - } - - @Override - public Deserializer<?> keyDeserializer() { - return keyDeserializer; + public Serde<?> keySerde() { + return this.keySerde; } @Override - public Deserializer<?> valueDeserializer() { - return valueDeserializer; + public Serde<?> valueSerde() { + return this.valSerde; } @Override
