http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index 884933b..b293496 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -17,9 +17,8 @@ package org.apache.kafka.streams.kstream.internals; -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.LongDeserializer; -import org.apache.kafka.common.serialization.LongSerializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.kstream.Aggregator; import org.apache.kafka.streams.kstream.Initializer; @@ -93,9 +92,6 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V private static final String WINDOWED_NAME = "KSTREAM-WINDOWED-"; - private static final LongSerializer LONG_SERIALIZER = new LongSerializer(); - private static final LongDeserializer LONG_DESERIALIZER = new LongDeserializer(); - public KStreamImpl(KStreamBuilder topology, String name, Set<String> sourceNodes) { super(topology, name, sourceNodes); } @@ -199,18 +195,16 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V @Override public KStream<K, V> through(String topic, - Serializer<K> keySerializer, - Serializer<V> valSerializer, - Deserializer<K> keyDeserializer, - Deserializer<V> valDeserializer) { - to(topic, keySerializer, valSerializer); + Serde<K> keySerde, + Serde<V> valSerde) { + to(topic, keySerde, valSerde); - return topology.stream(keyDeserializer, valDeserializer, topic); + return topology.stream(keySerde, valSerde); } @Override public KStream<K, V> through(String topic) { - return through(topic, null, null, null, null); + return through(topic, null, null); } @Override @@ -220,10 +214,13 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V @SuppressWarnings("unchecked") @Override - public void to(String topic, Serializer<K> keySerializer, Serializer<V> valSerializer) { + public void to(String topic, Serde<K> keySerde, Serde<V> valSerde) { String name = topology.newName(SINK_NAME); StreamPartitioner<K, V> streamPartitioner = null; + Serializer<K> keySerializer = keySerde == null ? null : keySerde.serializer(); + Serializer<V> valSerializer = keySerde == null ? null : valSerde.serializer(); + if (keySerializer != null && keySerializer instanceof WindowedSerializer) { WindowedSerializer<Object> windowedSerializer = (WindowedSerializer<Object>) keySerializer; streamPartitioner = (StreamPartitioner<K, V>) new WindowedStreamPartitioner<Object, V>(windowedSerializer); @@ -265,16 +262,11 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V KStream<K, V1> other, ValueJoiner<V, V1, R> joiner, JoinWindows windows, - Serializer<K> keySerializer, - Serializer<V> thisValueSerializer, - Serializer<V1> otherValueSerializer, - Deserializer<K> keyDeserializer, - Deserializer<V> thisValueDeserializer, - Deserializer<V1> otherValueDeserializer) { - - return join(other, joiner, windows, - keySerializer, thisValueSerializer, otherValueSerializer, - keyDeserializer, thisValueDeserializer, otherValueDeserializer, false); + Serde<K> keySerde, + Serde<V> thisValueSerde, + Serde<V1> otherValueSerde) { + + return join(other, joiner, windows, keySerde, thisValueSerde, otherValueSerde, false); } @Override @@ -282,16 +274,11 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V KStream<K, V1> other, ValueJoiner<V, V1, R> joiner, JoinWindows windows, - Serializer<K> keySerializer, - Serializer<V> thisValueSerializer, - Serializer<V1> otherValueSerializer, - Deserializer<K> keyDeserializer, - Deserializer<V> thisValueDeserializer, - Deserializer<V1> otherValueDeserializer) { - - return join(other, joiner, windows, - keySerializer, thisValueSerializer, otherValueSerializer, - keyDeserializer, thisValueDeserializer, otherValueDeserializer, true); + Serde<K> keySerde, + Serde<V> thisValueSerde, + Serde<V1> otherValueSerde) { + + return join(other, joiner, windows, keySerde, thisValueSerde, otherValueSerde, true); } @SuppressWarnings("unchecked") @@ -299,26 +286,23 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V KStream<K, V1> other, ValueJoiner<V, V1, R> joiner, JoinWindows windows, - Serializer<K> keySerializer, - Serializer<V> thisValueSerializer, - Serializer<V1> otherValueSerializer, - Deserializer<K> keyDeserializer, - Deserializer<V> thisValueDeserializer, - Deserializer<V1> otherValueDeserializer, + Serde<K> keySerde, + Serde<V> thisValueSerde, + Serde<V1> otherValueSerde, boolean outer) { Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other); StateStoreSupplier thisWindow = Stores.create(windows.name() + "-this") - .withKeys(keySerializer, keyDeserializer) - .withValues(thisValueSerializer, thisValueDeserializer) + .withKeys(keySerde) + .withValues(thisValueSerde) .persistent() .windowed(windows.maintainMs(), windows.segments, true) .build(); StateStoreSupplier otherWindow = Stores.create(windows.name() + "-other") - .withKeys(keySerializer, keyDeserializer) - .withValues(otherValueSerializer, otherValueDeserializer) + .withKeys(keySerde) + .withValues(otherValueSerde) .persistent() .windowed(windows.maintainMs(), windows.segments, true) .build(); @@ -354,16 +338,14 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V KStream<K, V1> other, ValueJoiner<V, V1, R> joiner, JoinWindows windows, - Serializer<K> keySerializer, - Serializer<V1> otherValueSerializer, - Deserializer<K> keyDeserializer, - Deserializer<V1> otherValueDeserializer) { + Serde<K> keySerde, + Serde<V1> otherValueSerde) { Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other); StateStoreSupplier otherWindow = Stores.create(windows.name() + "-other") - .withKeys(keySerializer, keyDeserializer) - .withValues(otherValueSerializer, otherValueDeserializer) + .withKeys(keySerde) + .withValues(otherValueSerde) .persistent() .windowed(windows.maintainMs(), windows.segments, true) .build(); @@ -397,18 +379,16 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V @Override public <W extends Window> KTable<Windowed<K>, V> reduceByKey(Reducer<V> reducer, Windows<W> windows, - Serializer<K> keySerializer, - Serializer<V> aggValueSerializer, - Deserializer<K> keyDeserializer, - Deserializer<V> aggValueDeserializer) { + Serde<K> keySerde, + Serde<V> aggValueSerde) { String reduceName = topology.newName(REDUCE_NAME); KStreamWindowReduce<K, V, W> reduceSupplier = new KStreamWindowReduce<>(windows, windows.name(), reducer); StateStoreSupplier reduceStore = Stores.create(windows.name()) - .withKeys(keySerializer, keyDeserializer) - .withValues(aggValueSerializer, aggValueDeserializer) + .withKeys(keySerde) + .withValues(aggValueSerde) .persistent() .windowed(windows.maintainMs(), windows.segments, false) .build(); @@ -423,10 +403,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V @Override public KTable<K, V> reduceByKey(Reducer<V> reducer, - Serializer<K> keySerializer, - Serializer<V> aggValueSerializer, - Deserializer<K> keyDeserializer, - Deserializer<V> aggValueDeserializer, + Serde<K> keySerde, + Serde<V> aggValueSerde, String name) { String reduceName = topology.newName(REDUCE_NAME); @@ -434,8 +412,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V KStreamReduce<K, V> reduceSupplier = new KStreamReduce<>(name, reducer); StateStoreSupplier reduceStore = Stores.create(name) - .withKeys(keySerializer, keyDeserializer) - .withValues(aggValueSerializer, aggValueDeserializer) + .withKeys(keySerde) + .withValues(aggValueSerde) .persistent() .build(); @@ -451,18 +429,16 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V public <T, W extends Window> KTable<Windowed<K>, T> aggregateByKey(Initializer<T> initializer, Aggregator<K, V, T> aggregator, Windows<W> windows, - Serializer<K> keySerializer, - Serializer<T> aggValueSerializer, - Deserializer<K> keyDeserializer, - Deserializer<T> aggValueDeserializer) { + Serde<K> keySerde, + Serde<T> aggValueSerde) { String aggregateName = topology.newName(AGGREGATE_NAME); KStreamAggProcessorSupplier<K, Windowed<K>, V, T> aggregateSupplier = new KStreamWindowAggregate<>(windows, windows.name(), initializer, aggregator); StateStoreSupplier aggregateStore = Stores.create(windows.name()) - .withKeys(keySerializer, keyDeserializer) - .withValues(aggValueSerializer, aggValueDeserializer) + .withKeys(keySerde) + .withValues(aggValueSerde) .persistent() .windowed(windows.maintainMs(), windows.segments, false) .build(); @@ -478,10 +454,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V @Override public <T> KTable<K, T> aggregateByKey(Initializer<T> initializer, Aggregator<K, V, T> aggregator, - Serializer<K> keySerializer, - Serializer<T> aggValueSerializer, - Deserializer<K> keyDeserializer, - Deserializer<T> aggValueDeserializer, + Serde<K> keySerde, + Serde<T> aggValueSerde, String name) { String aggregateName = topology.newName(AGGREGATE_NAME); @@ -489,8 +463,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V KStreamAggProcessorSupplier<K, K, V, T> aggregateSupplier = new KStreamAggregate<>(name, initializer, aggregator); StateStoreSupplier aggregateStore = Stores.create(name) - .withKeys(keySerializer, keyDeserializer) - .withValues(aggValueSerializer, aggValueDeserializer) + .withKeys(keySerde) + .withValues(aggValueSerde) .persistent() .build(); @@ -504,8 +478,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V @Override public <W extends Window> KTable<Windowed<K>, Long> countByKey(Windows<W> windows, - Serializer<K> keySerializer, - Deserializer<K> keyDeserializer) { + Serde<K> keySerde) { return this.aggregateByKey( new Initializer<Long>() { @Override @@ -518,13 +491,12 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V public Long apply(K aggKey, V value, Long aggregate) { return aggregate + 1L; } - }, windows, keySerializer, LONG_SERIALIZER, keyDeserializer, LONG_DESERIALIZER); + }, windows, keySerde, Serdes.Long()); } @Override - public KTable<K, Long> countByKey(Serializer<K> keySerializer, - Deserializer<K> keyDeserializer, - String name) { + public KTable<K, Long> countByKey(Serde<K> keySerde, + String name) { return this.aggregateByKey( new Initializer<Long>() { @Override @@ -537,6 +509,6 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V public Long apply(K aggKey, V value, Long aggregate) { return aggregate + 1L; } - }, keySerializer, LONG_SERIALIZER, keyDeserializer, LONG_DESERIALIZER, name); + }, keySerde, Serdes.Long(), name); } }
http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index d63fcc8..496a476 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -17,10 +17,8 @@ package org.apache.kafka.streams.kstream.internals; -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.LongDeserializer; -import org.apache.kafka.common.serialization.LongSerializer; -import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.kstream.Aggregator; import org.apache.kafka.streams.kstream.Initializer; import org.apache.kafka.streams.kstream.KStream; @@ -77,15 +75,10 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, private static final String TOSTREAM_NAME = "KTABLE-TOSTREAM-"; - private static final LongSerializer LONG_SERIALIZER = new LongSerializer(); - private static final LongDeserializer LONG_DESERIALIZER = new LongDeserializer(); - public final ProcessorSupplier<?, ?> processorSupplier; - private final Serializer<K> keySerializer; - private final Serializer<V> valSerializer; - private final Deserializer<K> keyDeserializer; - private final Deserializer<V> valDeserializer; + private final Serde<K> keySerde; + private final Serde<V> valSerde; private boolean sendOldValues = false; @@ -93,23 +86,19 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, String name, ProcessorSupplier<?, ?> processorSupplier, Set<String> sourceNodes) { - this(topology, name, processorSupplier, sourceNodes, null, null, null, null); + this(topology, name, processorSupplier, sourceNodes, null, null); } public KTableImpl(KStreamBuilder topology, String name, ProcessorSupplier<?, ?> processorSupplier, Set<String> sourceNodes, - Serializer<K> keySerializer, - Serializer<V> valSerializer, - Deserializer<K> keyDeserializer, - Deserializer<V> valDeserializer) { + Serde<K> keySerde, + Serde<V> valSerde) { super(topology, name, sourceNodes); this.processorSupplier = processorSupplier; - this.keySerializer = keySerializer; - this.valSerializer = valSerializer; - this.keyDeserializer = keyDeserializer; - this.valDeserializer = valDeserializer; + this.keySerde = keySerde; + this.valSerde = valSerde; } @Override @@ -143,18 +132,16 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, @Override public KTable<K, V> through(String topic, - Serializer<K> keySerializer, - Serializer<V> valSerializer, - Deserializer<K> keyDeserializer, - Deserializer<V> valDeserializer) { - to(topic, keySerializer, valSerializer); + Serde<K> keySerde, + Serde<V> valSerde) { + to(topic, keySerde, valSerde); - return topology.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic); + return topology.table(keySerde, valSerde, topic); } @Override public KTable<K, V> through(String topic) { - return through(topic, null, null, null, null); + return through(topic, null, null); } @Override @@ -163,8 +150,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, } @Override - public void to(String topic, Serializer<K> keySerializer, Serializer<V> valSerializer) { - this.toStream().to(topic, keySerializer, valSerializer); + public void to(String topic, Serde<K> keySerde, Serde<V> valSerde) { + this.toStream().to(topic, keySerde, valSerde); } @Override @@ -255,12 +242,9 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, Aggregator<K1, V1, T> add, Aggregator<K1, V1, T> remove, KeyValueMapper<K, V, KeyValue<K1, V1>> selector, - Serializer<K1> keySerializer, - Serializer<V1> valueSerializer, - Serializer<T> aggValueSerializer, - Deserializer<K1> keyDeserializer, - Deserializer<V1> valueDeserializer, - Deserializer<T> aggValueDeserializer, + Serde<K1> keySerde, + Serde<V1> valueSerde, + Serde<T> aggValueSerde, String name) { String selectName = topology.newName(SELECT_NAME); @@ -270,16 +254,16 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, String topic = name + REPARTITION_TOPIC_SUFFIX; - ChangedSerializer<V1> changedValueSerializer = new ChangedSerializer<>(valueSerializer); - ChangedDeserializer<V1> changedValueDeserializer = new ChangedDeserializer<>(valueDeserializer); + ChangedSerializer<V1> changedValueSerializer = new ChangedSerializer<>(valueSerde.serializer()); + ChangedDeserializer<V1> changedValueDeserializer = new ChangedDeserializer<>(valueSerde.deserializer()); KTableProcessorSupplier<K, V, KeyValue<K1, V1>> selectSupplier = new KTableRepartitionMap<>(this, selector); ProcessorSupplier<K1, Change<V1>> aggregateSupplier = new KTableAggregate<>(name, initializer, add, remove); StateStoreSupplier aggregateStore = Stores.create(name) - .withKeys(keySerializer, keyDeserializer) - .withValues(aggValueSerializer, aggValueDeserializer) + .withKeys(keySerde) + .withValues(aggValueSerde) .persistent() .build(); @@ -289,10 +273,10 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, // send the aggregate key-value pairs to the intermediate topic for partitioning topology.addInternalTopic(topic); - topology.addSink(sinkName, topic, keySerializer, changedValueSerializer, selectName); + topology.addSink(sinkName, topic, keySerde.serializer(), changedValueSerializer, selectName); // read the intermediate topic - topology.addSource(sourceName, keyDeserializer, changedValueDeserializer, topic); + topology.addSource(sourceName, keySerde.deserializer(), changedValueDeserializer, topic); // aggregate the values with the aggregator and local store topology.addProcessor(aggregateName, aggregateSupplier, sourceName); @@ -304,10 +288,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, @Override public <K1> KTable<K1, Long> count(final KeyValueMapper<K, V, K1> selector, - Serializer<K1> keySerializer, - Serializer<V> valueSerializer, - Deserializer<K1> keyDeserializer, - Deserializer<V> valueDeserializer, + Serde<K1> keySerde, + Serde<V> valueSerde, String name) { return this.aggregate( new Initializer<Long>() { @@ -332,17 +314,15 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, return new KeyValue<>(selector.apply(key, value), value); } }, - keySerializer, valueSerializer, LONG_SERIALIZER, keyDeserializer, valueDeserializer, LONG_DESERIALIZER, name); + keySerde, valueSerde, Serdes.Long(), name); } @Override public <K1, V1> KTable<K1, V1> reduce(Reducer<V1> addReducer, Reducer<V1> removeReducer, KeyValueMapper<K, V, KeyValue<K1, V1>> selector, - Serializer<K1> keySerializer, - Serializer<V1> valueSerializer, - Deserializer<K1> keyDeserializer, - Deserializer<V1> valueDeserializer, + Serde<K1> keySerde, + Serde<V1> valueSerde, String name) { String selectName = topology.newName(SELECT_NAME); @@ -352,16 +332,16 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, String topic = name + REPARTITION_TOPIC_SUFFIX; - ChangedSerializer<V1> changedValueSerializer = new ChangedSerializer<>(valueSerializer); - ChangedDeserializer<V1> changedValueDeserializer = new ChangedDeserializer<>(valueDeserializer); + ChangedSerializer<V1> changedValueSerializer = new ChangedSerializer<>(valueSerde.serializer()); + ChangedDeserializer<V1> changedValueDeserializer = new ChangedDeserializer<>(valueSerde.deserializer()); KTableProcessorSupplier<K, V, KeyValue<K1, V1>> selectSupplier = new KTableRepartitionMap<>(this, selector); ProcessorSupplier<K1, Change<V1>> aggregateSupplier = new KTableReduce<>(name, addReducer, removeReducer); StateStoreSupplier aggregateStore = Stores.create(name) - .withKeys(keySerializer, keyDeserializer) - .withValues(valueSerializer, valueDeserializer) + .withKeys(keySerde) + .withValues(valueSerde) .persistent() .build(); @@ -371,10 +351,10 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, // send the aggregate key-value pairs to the intermediate topic for partitioning topology.addInternalTopic(topic); - topology.addSink(sinkName, topic, keySerializer, changedValueSerializer, selectName); + topology.addSink(sinkName, topic, keySerde.serializer(), changedValueSerializer, selectName); // read the intermediate topic - topology.addSource(sourceName, keyDeserializer, changedValueDeserializer, topic); + topology.addSource(sourceName, keySerde.deserializer(), changedValueDeserializer, topic); // aggregate the values with the aggregator and local store topology.addProcessor(reduceName, aggregateSupplier, sourceName); @@ -421,7 +401,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, synchronized (source) { if (!source.isMaterialized()) { StateStoreSupplier storeSupplier = - new KTableStoreSupplier<>(source.topic, keySerializer, keyDeserializer, valSerializer, valDeserializer, null); + new KTableStoreSupplier<>(source.topic, keySerde, valSerde, null); // mark this state as non internal hence it is read directly from a user topic topology.addStateStore(storeSupplier, false, name); source.materialize(); http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableStoreSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableStoreSupplier.java index ffd5cf0..af3c0d7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableStoreSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableStoreSupplier.java @@ -17,14 +17,13 @@ package org.apache.kafka.streams.kstream.internals; -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreSupplier; import org.apache.kafka.streams.state.internals.MeteredKeyValueStore; import org.apache.kafka.streams.state.internals.RocksDBStore; -import org.apache.kafka.streams.state.Serdes; +import org.apache.kafka.streams.state.StateSerdes; /** * A KTable storage. It stores all entries in a local RocksDB database. @@ -35,15 +34,15 @@ import org.apache.kafka.streams.state.Serdes; public class KTableStoreSupplier<K, V> implements StateStoreSupplier { private final String name; - private final Serdes<K, V> serdes; + private final StateSerdes<K, V> serdes; private final Time time; protected KTableStoreSupplier(String name, - Serializer<K> keySerializer, Deserializer<K> keyDeserializer, - Serializer<V> valSerializer, Deserializer<V> valDeserializer, + Serde<K> keySerde, + Serde<V> valSerde, Time time) { this.name = name; - this.serdes = new Serdes<>(name, keySerializer, keyDeserializer, valSerializer, valDeserializer); + this.serdes = new StateSerdes<>(name, keySerde, valSerde); this.time = time; } http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java index e9d5252..fdcff19 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java @@ -17,8 +17,7 @@ package org.apache.kafka.streams.processor; -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.streams.StreamsMetrics; import java.io.File; @@ -43,32 +42,18 @@ public interface ProcessorContext { TaskId taskId(); /** - * Returns the key serializer + * Returns the default key serde * * @return the key serializer */ - Serializer<?> keySerializer(); + Serde<?> keySerde(); /** - * Returns the value serializer + * Returns the default value serde * * @return the value serializer */ - Serializer<?> valueSerializer(); - - /** - * Returns the key deserializer - * - * @return the key deserializer - */ - Deserializer<?> keyDeserializer(); - - /** - * Returns the value deserializer - * - * @return the value deserializer - */ - Deserializer<?> valueDeserializer(); + Serde<?> valueSerde(); /** * Returns the state directory for the partition. http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java index ab7122b..7f5d645 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java @@ -194,8 +194,8 @@ public class TopologyBuilder { /** * Add a new source that consumes the named topics and forwards the messages to child processor and/or sink nodes. - * The source will use the {@link org.apache.kafka.streams.StreamsConfig#KEY_DESERIALIZER_CLASS_CONFIG default key deserializer} and - * {@link org.apache.kafka.streams.StreamsConfig#VALUE_DESERIALIZER_CLASS_CONFIG default value deserializer} specified in the + * The source will use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERDE_CLASS_CONFIG default key deserializer} and + * {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the * {@link org.apache.kafka.streams.StreamsConfig stream configuration}. * * @param name the unique name of the source used to reference this node when @@ -214,10 +214,10 @@ public class TopologyBuilder { * @param name the unique name of the source used to reference this node when * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}. * @param keyDeserializer the {@link Deserializer key deserializer} used when consuming messages; may be null if the source - * should use the {@link org.apache.kafka.streams.StreamsConfig#KEY_DESERIALIZER_CLASS_CONFIG default key deserializer} specified in the + * should use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERDE_CLASS_CONFIG default key deserializer} specified in the * {@link org.apache.kafka.streams.StreamsConfig stream configuration} * @param valDeserializer the {@link Deserializer value deserializer} used when consuming messages; may be null if the source - * should use the {@link org.apache.kafka.streams.StreamsConfig#VALUE_DESERIALIZER_CLASS_CONFIG default value deserializer} specified in the + * should use the {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the * {@link org.apache.kafka.streams.StreamsConfig stream configuration} * @param topics the name of one or more Kafka topics that this source is to consume * @return this builder instance so methods can be chained together; never null @@ -242,8 +242,8 @@ public class TopologyBuilder { /** * Add a new sink that forwards messages from upstream parent processor and/or source nodes to the named Kafka topic. - * The sink will use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERIALIZER_CLASS_CONFIG default key serializer} and - * {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERIALIZER_CLASS_CONFIG default value serializer} specified in the + * The sink will use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERDE_CLASS_CONFIG default key serializer} and + * {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the * {@link org.apache.kafka.streams.StreamsConfig stream configuration}. * * @param name the unique name of the sink @@ -262,8 +262,8 @@ public class TopologyBuilder { /** * Add a new sink that forwards messages from upstream parent processor and/or source nodes to the named Kafka topic, using * the supplied partitioner. - * The sink will use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERIALIZER_CLASS_CONFIG default key serializer} and - * {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERIALIZER_CLASS_CONFIG default value serializer} specified in the + * The sink will use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERDE_CLASS_CONFIG default key serializer} and + * {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the * {@link org.apache.kafka.streams.StreamsConfig stream configuration}. * <p> * The sink will also use the specified {@link StreamPartitioner} to determine how messages are distributed among @@ -293,10 +293,10 @@ public class TopologyBuilder { * @param name the unique name of the sink * @param topic the name of the Kafka topic to which this sink should write its messages * @param keySerializer the {@link Serializer key serializer} used when consuming messages; may be null if the sink - * should use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERIALIZER_CLASS_CONFIG default key serializer} specified in the + * should use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERDE_CLASS_CONFIG default key serializer} specified in the * {@link org.apache.kafka.streams.StreamsConfig stream configuration} * @param valSerializer the {@link Serializer value serializer} used when consuming messages; may be null if the sink - * should use the {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERIALIZER_CLASS_CONFIG default value serializer} specified in the + * should use the {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the * {@link org.apache.kafka.streams.StreamsConfig stream configuration} * @param parentNames the name of one or more source or processor nodes whose output message this sink should consume * and write to its topic @@ -316,10 +316,10 @@ public class TopologyBuilder { * @param name the unique name of the sink * @param topic the name of the Kafka topic to which this sink should write its messages * @param keySerializer the {@link Serializer key serializer} used when consuming messages; may be null if the sink - * should use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERIALIZER_CLASS_CONFIG default key serializer} specified in the + * should use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERDE_CLASS_CONFIG default key serializer} specified in the * {@link org.apache.kafka.streams.StreamsConfig stream configuration} * @param valSerializer the {@link Serializer value serializer} used when consuming messages; may be null if the sink - * should use the {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERIALIZER_CLASS_CONFIG default value serializer} specified in the + * should use the {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the * {@link org.apache.kafka.streams.StreamsConfig stream configuration} * @param partitioner the function that should be used to determine the partition for each message processed by the sink * @param parentNames the name of one or more source or processor nodes whose output message this sink should consume http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java index f6e43d0..888b89e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java @@ -17,11 +17,10 @@ package org.apache.kafka.streams.processor.internals; -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.streams.errors.TopologyBuilderException; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsMetrics; -import org.apache.kafka.streams.errors.TopologyBuilderException; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateRestoreCallback; @@ -37,10 +36,8 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S private final RecordCollector collector; private final ProcessorStateManager stateMgr; - private final Serializer<?> keySerializer; - private final Serializer<?> valSerializer; - private final Deserializer<?> keyDeserializer; - private final Deserializer<?> valDeserializer; + private final Serde<?> keySerde; + private final Serde<?> valSerde; private boolean initialized; @@ -57,10 +54,8 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S this.collector = collector; this.stateMgr = stateMgr; - this.keySerializer = config.keySerializer(); - this.valSerializer = config.valueSerializer(); - this.keyDeserializer = config.keyDeserializer(); - this.valDeserializer = config.valueDeserializer(); + this.keySerde = config.keySerde(); + this.valSerde = config.valueSerde(); this.initialized = false; } @@ -89,23 +84,13 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S } @Override - public Serializer<?> keySerializer() { - return this.keySerializer; - } - - @Override - public Serializer<?> valueSerializer() { - return this.valSerializer; - } - - @Override - public Deserializer<?> keyDeserializer() { - return this.keyDeserializer; + public Serde<?> keySerde() { + return this.keySerde; } @Override - public Deserializer<?> valueDeserializer() { - return this.valDeserializer; + public Serde<?> valueSerde() { + return this.valSerde; } @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java index 7ab59ee..ffc72fd 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java @@ -49,8 +49,8 @@ public class SinkNode<K, V> extends ProcessorNode<K, V> { @Override public void init(ProcessorContext context) { this.context = context; - if (this.keySerializer == null) this.keySerializer = (Serializer<K>) context.keySerializer(); - if (this.valSerializer == null) this.valSerializer = (Serializer<V>) context.valueSerializer(); + if (this.keySerializer == null) this.keySerializer = (Serializer<K>) context.keySerde().serializer(); + if (this.valSerializer == null) this.valSerializer = (Serializer<V>) context.valueSerde().serializer(); } @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java index fa4afaf..1868c1b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java @@ -47,8 +47,8 @@ public class SourceNode<K, V> extends ProcessorNode<K, V> { this.context = context; // if serializers are null, get the default ones from the context - if (this.keyDeserializer == null) this.keyDeserializer = (Deserializer<K>) context.keyDeserializer(); - if (this.valDeserializer == null) this.valDeserializer = (Deserializer<V>) context.valueDeserializer(); + if (this.keyDeserializer == null) this.keyDeserializer = (Deserializer<K>) context.keySerde().deserializer(); + if (this.valDeserializer == null) this.valDeserializer = (Deserializer<V>) context.valueSerde().deserializer(); } @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java index 0bcae18..3ad06e2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java @@ -17,8 +17,7 @@ package org.apache.kafka.streams.processor.internals; -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsMetrics; import org.apache.kafka.streams.processor.ProcessorContext; @@ -35,10 +34,8 @@ public class StandbyContextImpl implements ProcessorContext, RecordCollector.Sup private final StreamsMetrics metrics; private final ProcessorStateManager stateMgr; - private final Serializer<?> keySerializer; - private final Serializer<?> valSerializer; - private final Deserializer<?> keyDeserializer; - private final Deserializer<?> valDeserializer; + private final Serde<?> keySerde; + private final Serde<?> valSerde; private boolean initialized; @@ -52,10 +49,8 @@ public class StandbyContextImpl implements ProcessorContext, RecordCollector.Sup this.metrics = metrics; this.stateMgr = stateMgr; - this.keySerializer = config.keySerializer(); - this.valSerializer = config.valueSerializer(); - this.keyDeserializer = config.keyDeserializer(); - this.valDeserializer = config.valueDeserializer(); + this.keySerde = config.keySerde(); + this.valSerde = config.valueSerde(); this.initialized = false; } @@ -84,23 +79,13 @@ public class StandbyContextImpl implements ProcessorContext, RecordCollector.Sup } @Override - public Serializer<?> keySerializer() { - return this.keySerializer; + public Serde<?> keySerde() { + return this.keySerde; } @Override - public Serializer<?> valueSerializer() { - return this.valSerializer; - } - - @Override - public Deserializer<?> keyDeserializer() { - return this.keyDeserializer; - } - - @Override - public Deserializer<?> valueDeserializer() { - return this.valDeserializer; + public Serde<?> valueSerde() { + return this.valSerde; } @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 491c812..e1a518d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -100,7 +100,6 @@ public class StreamThread extends Thread { private long lastClean; private long lastCommit; - private long recordsProcessed; private Throwable rebalanceException = null; private Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> standbyRecords; @@ -201,7 +200,6 @@ public class StreamThread extends Thread { this.lastClean = Long.MAX_VALUE; // the cleaning cycle won't start until partition assignment this.lastCommit = time.milliseconds(); - this.recordsProcessed = 0; this.time = time; this.sensors = new StreamsMetricsImpl(metrics); http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java b/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java deleted file mode 100644 index e925312..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java +++ /dev/null @@ -1,136 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.state; - -import org.apache.kafka.common.serialization.ByteArrayDeserializer; -import org.apache.kafka.common.serialization.ByteArraySerializer; -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.IntegerDeserializer; -import org.apache.kafka.common.serialization.IntegerSerializer; -import org.apache.kafka.common.serialization.LongDeserializer; -import org.apache.kafka.common.serialization.LongSerializer; -import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; - -/** - * Factory for creating serializers / deserializers for state stores in Kafka Streams. - * - * @param <K> key type of serdes - * @param <V> value type of serdes - */ -public final class Serdes<K, V> { - - public static <K, V> Serdes<K, V> withBuiltinTypes(String topic, Class<K> keyClass, Class<V> valueClass) { - Serializer<K> keySerializer = serializer(keyClass); - Deserializer<K> keyDeserializer = deserializer(keyClass); - Serializer<V> valueSerializer = serializer(valueClass); - Deserializer<V> valueDeserializer = deserializer(valueClass); - return new Serdes<>(topic, keySerializer, keyDeserializer, valueSerializer, valueDeserializer); - } - - @SuppressWarnings("unchecked") - static <T> Serializer<T> serializer(Class<T> type) { - if (String.class.isAssignableFrom(type)) return (Serializer<T>) new StringSerializer(); - if (Integer.class.isAssignableFrom(type)) return (Serializer<T>) new IntegerSerializer(); - if (Long.class.isAssignableFrom(type)) return (Serializer<T>) new LongSerializer(); - if (byte[].class.isAssignableFrom(type)) return (Serializer<T>) new ByteArraySerializer(); - throw new IllegalArgumentException("Unknown class for built-in serializer"); - } - - @SuppressWarnings("unchecked") - static <T> Deserializer<T> deserializer(Class<T> type) { - if (String.class.isAssignableFrom(type)) return (Deserializer<T>) new StringDeserializer(); - if (Integer.class.isAssignableFrom(type)) return (Deserializer<T>) new IntegerDeserializer(); - if (Long.class.isAssignableFrom(type)) return (Deserializer<T>) new LongDeserializer(); - if (byte[].class.isAssignableFrom(type)) return (Deserializer<T>) new ByteArrayDeserializer(); - throw new IllegalArgumentException("Unknown class for built-in serializer"); - } - - private final String topic; - private Serializer<K> keySerializer; - private Serializer<V> valueSerializer; - private Deserializer<K> keyDeserializer; - private Deserializer<V> valueDeserializer; - - /** - * Create a context for serialization using the specified serializers and deserializers which - * <em>must</em> match the key and value types used as parameters for this object. - * - * @param topic the name of the topic - * @param keySerializer the serializer for keys; may be null - * @param keyDeserializer the deserializer for keys; may be null - * @param valueSerializer the serializer for values; may be null - * @param valueDeserializer the deserializer for values; may be null - */ - @SuppressWarnings("unchecked") - public Serdes(String topic, - Serializer<K> keySerializer, Deserializer<K> keyDeserializer, - Serializer<V> valueSerializer, Deserializer<V> valueDeserializer) { - this.topic = topic; - - if (keySerializer == null) - throw new NullPointerException(); - if (keyDeserializer == null) - throw new NullPointerException(); - if (valueSerializer == null) - throw new NullPointerException(); - if (valueDeserializer == null) - throw new NullPointerException(); - - this.keySerializer = keySerializer; - this.keyDeserializer = keyDeserializer; - this.valueSerializer = valueSerializer; - this.valueDeserializer = valueDeserializer; - } - - public Deserializer<K> keyDeserializer() { - return keyDeserializer; - } - - public Serializer<K> keySerializer() { - return keySerializer; - } - - public Deserializer<V> valueDeserializer() { - return valueDeserializer; - } - - public Serializer<V> valueSerializer() { - return valueSerializer; - } - - public String topic() { - return topic; - } - - public K keyFrom(byte[] rawKey) { - return keyDeserializer.deserialize(topic, rawKey); - } - - public V valueFrom(byte[] rawValue) { - return valueDeserializer.deserialize(topic, rawValue); - } - - public byte[] rawKey(K key) { - return keySerializer.serialize(topic, key); - } - - public byte[] rawValue(V value) { - return valueSerializer.serialize(topic, value); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java b/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java new file mode 100644 index 0000000..1a41a16 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state; + +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serializer; + +/** + * Factory for creating serializers / deserializers for state stores in Kafka Streams. + * + * @param <K> key type of serdes + * @param <V> value type of serdes + */ +public final class StateSerdes<K, V> { + + public static <K, V> StateSerdes<K, V> withBuiltinTypes(String topic, Class<K> keyClass, Class<V> valueClass) { + return new StateSerdes<>(topic, Serdes.serdeFrom(keyClass), Serdes.serdeFrom(valueClass)); + } + + private final String topic; + private final Serde<K> keySerde; + private final Serde<V> valueSerde; + + /** + * Create a context for serialization using the specified serializers and deserializers which + * <em>must</em> match the key and value types used as parameters for this object; the state changelog topic + * is provided to bind this serde factory to, so that future calls for serialize / deserialize do not + * need to provide the topic name any more. + * + * @param topic the name of the topic + * @param keySerde the serde for keys; cannot be null + * @param valueSerde the serde for values; cannot be null + */ + @SuppressWarnings("unchecked") + public StateSerdes(String topic, + Serde<K> keySerde, + Serde<V> valueSerde) { + this.topic = topic; + + if (keySerde == null) + throw new IllegalArgumentException("key serde cannot be null"); + if (valueSerde == null) + throw new IllegalArgumentException("value serde cannot be null"); + + this.keySerde = keySerde; + this.valueSerde = valueSerde; + } + + public Serde<K> keySerde() { + return keySerde; + } + + public Serde<V> valueSerde() { + return valueSerde; + } + + public Deserializer<K> keyDeserializer() { + return keySerde.deserializer(); + } + + public Serializer<K> keySerializer() { + return keySerde.serializer(); + } + + public Deserializer<V> valueDeserializer() { + return valueSerde.deserializer(); + } + + public Serializer<V> valueSerializer() { + return valueSerde.serializer(); + } + + public String topic() { + return topic; + } + + public K keyFrom(byte[] rawKey) { + return keySerde.deserializer().deserialize(topic, rawKey); + } + + public V valueFrom(byte[] rawValue) { + return valueSerde.deserializer().deserialize(topic, rawValue); + } + + public byte[] rawKey(K key) { + return keySerde.serializer().serialize(topic, key); + } + + public byte[] rawValue(V value) { + return valueSerde.serializer().serialize(topic, value); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/main/java/org/apache/kafka/streams/state/Stores.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java index e803832..33df13f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java @@ -16,22 +16,16 @@ */ package org.apache.kafka.streams.state; -import org.apache.kafka.common.serialization.ByteArrayDeserializer; -import org.apache.kafka.common.serialization.ByteArraySerializer; -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.IntegerDeserializer; -import org.apache.kafka.common.serialization.IntegerSerializer; -import org.apache.kafka.common.serialization.LongDeserializer; -import org.apache.kafka.common.serialization.LongSerializer; -import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.streams.processor.StateStoreSupplier; import org.apache.kafka.streams.state.internals.InMemoryKeyValueStoreSupplier; import org.apache.kafka.streams.state.internals.InMemoryLRUCacheStoreSupplier; import org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreSupplier; import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier; +import java.nio.ByteBuffer; + /** * Factory for creating state stores in Kafka Streams. */ @@ -46,13 +40,12 @@ public class Stores { public static StoreFactory create(final String name) { return new StoreFactory() { @Override - public <K> ValueFactory<K> withKeys(final Serializer<K> keySerializer, final Deserializer<K> keyDeserializer) { + public <K> ValueFactory<K> withKeys(final Serde<K> keySerde) { return new ValueFactory<K>() { @Override - public <V> KeyValueFactory<K, V> withValues(final Serializer<V> valueSerializer, - final Deserializer<V> valueDeserializer) { - final Serdes<K, V> serdes = - new Serdes<>(name, keySerializer, keyDeserializer, valueSerializer, valueDeserializer); + public <V> KeyValueFactory<K, V> withValues(final Serde<V> valueSerde) { + final StateSerdes<K, V> serdes = + new StateSerdes<>(name, keySerde, valueSerde); return new KeyValueFactory<K, V>() { @Override public InMemoryKeyValueFactory<K, V> inMemory() { @@ -116,7 +109,7 @@ public class Stores { * @return the interface used to specify the type of values; never null */ public ValueFactory<String> withStringKeys() { - return withKeys(new StringSerializer(), new StringDeserializer()); + return withKeys(Serdes.String()); } /** @@ -125,7 +118,7 @@ public class Stores { * @return the interface used to specify the type of values; never null */ public ValueFactory<Integer> withIntegerKeys() { - return withKeys(new IntegerSerializer(), new IntegerDeserializer()); + return withKeys(Serdes.Integer()); } /** @@ -134,7 +127,25 @@ public class Stores { * @return the interface used to specify the type of values; never null */ public ValueFactory<Long> withLongKeys() { - return withKeys(new LongSerializer(), new LongDeserializer()); + return withKeys(Serdes.Long()); + } + + /** + * Begin to create a {@link KeyValueStore} by specifying the keys will be {@link Double}s. + * + * @return the interface used to specify the type of values; never null + */ + public ValueFactory<Double> withDoubleKeys() { + return withKeys(Serdes.Double()); + } + + /** + * Begin to create a {@link KeyValueStore} by specifying the keys will be {@link ByteBuffer}. + * + * @return the interface used to specify the type of values; never null + */ + public ValueFactory<ByteBuffer> withByteBufferKeys() { + return withKeys(Serdes.ByteBuffer()); } /** @@ -143,30 +154,26 @@ public class Stores { * @return the interface used to specify the type of values; never null */ public ValueFactory<byte[]> withByteArrayKeys() { - return withKeys(new ByteArraySerializer(), new ByteArrayDeserializer()); + return withKeys(Serdes.ByteArray()); } /** - * Begin to create a {@link KeyValueStore} by specifying the keys will be either {@link String}, {@link Integer}, - * {@link Long}, or {@code byte[]}. + * Begin to create a {@link KeyValueStore} by specifying the keys. * - * @param keyClass the class for the keys, which must be one of the types for which Kafka has built-in serializers and - * deserializers (e.g., {@code String.class}, {@code Integer.class}, {@code Long.class}, or - * {@code byte[].class}) + * @param keyClass the class for the keys, which must be one of the types for which Kafka has built-in serdes * @return the interface used to specify the type of values; never null */ public <K> ValueFactory<K> withKeys(Class<K> keyClass) { - return withKeys(Serdes.serializer(keyClass), Serdes.deserializer(keyClass)); + return withKeys(Serdes.serdeFrom(keyClass)); } /** * Begin to create a {@link KeyValueStore} by specifying the serializer and deserializer for the keys. * - * @param keySerializer the serializer for keys; may not be null - * @param keyDeserializer the deserializer for keys; may not be null + * @param keySerde the serialization factory for keys; may not be null * @return the interface used to specify the type of values; never null */ - public abstract <K> ValueFactory<K> withKeys(Serializer<K> keySerializer, Deserializer<K> keyDeserializer); + public abstract <K> ValueFactory<K> withKeys(Serde<K> keySerde); } /** @@ -181,7 +188,7 @@ public class Stores { * @return the interface used to specify the remaining key-value store options; never null */ public KeyValueFactory<K, String> withStringValues() { - return withValues(new StringSerializer(), new StringDeserializer()); + return withValues(Serdes.String()); } /** @@ -190,7 +197,7 @@ public class Stores { * @return the interface used to specify the remaining key-value store options; never null */ public KeyValueFactory<K, Integer> withIntegerValues() { - return withValues(new IntegerSerializer(), new IntegerDeserializer()); + return withValues(Serdes.Integer()); } /** @@ -199,7 +206,25 @@ public class Stores { * @return the interface used to specify the remaining key-value store options; never null */ public KeyValueFactory<K, Long> withLongValues() { - return withValues(new LongSerializer(), new LongDeserializer()); + return withValues(Serdes.Long()); + } + + /** + * Use {@link Double} values. + * + * @return the interface used to specify the remaining key-value store options; never null + */ + public KeyValueFactory<K, Double> withDoubleValues() { + return withValues(Serdes.Double()); + } + + /** + * Use {@link ByteBuffer} for values. + * + * @return the interface used to specify the remaining key-value store options; never null + */ + public KeyValueFactory<K, ByteBuffer> withByteBufferValues() { + return withValues(Serdes.ByteBuffer()); } /** @@ -208,30 +233,26 @@ public class Stores { * @return the interface used to specify the remaining key-value store options; never null */ public KeyValueFactory<K, byte[]> withByteArrayValues() { - return withValues(new ByteArraySerializer(), new ByteArrayDeserializer()); + return withValues(Serdes.ByteArray()); } /** - * Use values of the specified type, which must be either {@link String}, {@link Integer}, {@link Long}, or {@code byte[]} - * . + * Use values of the specified type. * - * @param valueClass the class for the values, which must be one of the types for which Kafka has built-in serializers and - * deserializers (e.g., {@code String.class}, {@code Integer.class}, {@code Long.class}, or - * {@code byte[].class}) + * @param valueClass the class for the values, which must be one of the types for which Kafka has built-in serdes * @return the interface used to specify the remaining key-value store options; never null */ public <V> KeyValueFactory<K, V> withValues(Class<V> valueClass) { - return withValues(Serdes.serializer(valueClass), Serdes.deserializer(valueClass)); + return withValues(Serdes.serdeFrom(valueClass)); } /** * Use the specified serializer and deserializer for the values. * - * @param valueSerializer the serializer for value; may not be null - * @param valueDeserializer the deserializer for values; may not be null + * @param valueSerde the serialization factory for values; may not be null * @return the interface used to specify the remaining key-value store options; never null */ - public abstract <V> KeyValueFactory<K, V> withValues(Serializer<V> valueSerializer, Deserializer<V> valueDeserializer); + public abstract <V> KeyValueFactory<K, V> withValues(Serde<V> valueSerde); } /** @@ -240,7 +261,7 @@ public class Stores { * @param <K> the type of keys * @param <V> the type of values */ - public static interface KeyValueFactory<K, V> { + public interface KeyValueFactory<K, V> { /** * Keep all key-value entries in-memory, although for durability all entries are recorded in a Kafka topic that can be * read to restore the entries if they are lost. @@ -264,7 +285,7 @@ public class Stores { * @param <K> the type of keys * @param <V> the type of values */ - public static interface InMemoryKeyValueFactory<K, V> { + public interface InMemoryKeyValueFactory<K, V> { /** * Limits the in-memory key-value store to hold a maximum number of entries. The default is {@link Integer#MAX_VALUE}, which is * equivalent to not placing a limit on the number of entries. @@ -288,7 +309,7 @@ public class Stores { * @param <K> the type of keys * @param <V> the type of values */ - public static interface PersistentKeyValueFactory<K, V> { + public interface PersistentKeyValueFactory<K, V> { /** * Set the persistent store as a windowed key-value store http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreUtils.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreUtils.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreUtils.java index c6bbb23..66e1338 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreUtils.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreUtils.java @@ -25,11 +25,11 @@ public class WindowStoreUtils { public static final int TIMESTAMP_SIZE = 8; public static final int SEQNUM_SIZE = 4; - public static final Serdes<byte[], byte[]> INNER_SERDES = Serdes.withBuiltinTypes("", byte[].class, byte[].class); + public static final StateSerdes<byte[], byte[]> INNER_SERDES = StateSerdes.withBuiltinTypes("", byte[].class, byte[].class); @SuppressWarnings("unchecked") public static final KeyValueIterator<byte[], byte[]>[] NO_ITERATORS = (KeyValueIterator<byte[], byte[]>[]) new KeyValueIterator[0]; - public static <K> byte[] toBinaryKey(K key, final long timestamp, final int seqnum, Serdes<K, ?> serdes) { + public static <K> byte[] toBinaryKey(K key, final long timestamp, final int seqnum, StateSerdes<K, ?> serdes) { byte[] serializedKey = serdes.rawKey(key); ByteBuffer buf = ByteBuffer.allocate(serializedKey.length + TIMESTAMP_SIZE + SEQNUM_SIZE); @@ -40,7 +40,7 @@ public class WindowStoreUtils { return buf.array(); } - public static <K> K keyFromBinaryKey(byte[] binaryKey, Serdes<K, ?> serdes) { + public static <K> K keyFromBinaryKey(byte[] binaryKey, StateSerdes<K, ?> serdes) { byte[] bytes = new byte[binaryKey.length - TIMESTAMP_SIZE - SEQNUM_SIZE]; System.arraycopy(binaryKey, 0, bytes, 0, bytes.length); http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java index d25faa8..32116dd 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java @@ -22,20 +22,20 @@ import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; -import org.apache.kafka.streams.state.Serdes; +import org.apache.kafka.streams.state.StateSerdes; import java.util.List; public class InMemoryKeyValueLoggedStore<K, V> implements KeyValueStore<K, V> { private final KeyValueStore<K, V> inner; - private final Serdes<K, V> serdes; + private final StateSerdes<K, V> serdes; private final String storeName; private StoreChangeLogger<K, V> changeLogger; private StoreChangeLogger.ValueGetter<K, V> getter; - public InMemoryKeyValueLoggedStore(final String storeName, final KeyValueStore<K, V> inner, final Serdes<K, V> serdes) { + public InMemoryKeyValueLoggedStore(final String storeName, final KeyValueStore<K, V> inner, final StateSerdes<K, V> serdes) { this.storeName = storeName; this.inner = inner; this.serdes = serdes; http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java index b96a103..4054d68 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java @@ -25,7 +25,7 @@ import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreSupplier; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; -import org.apache.kafka.streams.state.Serdes; +import org.apache.kafka.streams.state.StateSerdes; import java.util.Iterator; import java.util.List; @@ -45,9 +45,9 @@ public class InMemoryKeyValueStoreSupplier<K, V> implements StateStoreSupplier { private final String name; private final Time time; - private final Serdes<K, V> serdes; + private final StateSerdes<K, V> serdes; - public InMemoryKeyValueStoreSupplier(String name, Serdes<K, V> serdes, Time time) { + public InMemoryKeyValueStoreSupplier(String name, StateSerdes<K, V> serdes, Time time) { this.name = name; this.time = time; this.serdes = serdes; @@ -67,7 +67,7 @@ public class InMemoryKeyValueStoreSupplier<K, V> implements StateStoreSupplier { private final NavigableMap<K, V> map; private boolean loggingEnabled = false; - private Serdes<K, V> serdes = null; + private StateSerdes<K, V> serdes = null; public MemoryStore(String name) { super(); @@ -75,7 +75,7 @@ public class InMemoryKeyValueStoreSupplier<K, V> implements StateStoreSupplier { this.map = new TreeMap<>(); } - public KeyValueStore<K, V> enableLogging(Serdes<K, V> serdes) { + public KeyValueStore<K, V> enableLogging(StateSerdes<K, V> serdes) { this.loggingEnabled = true; this.serdes = serdes; http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java index 9b7936a..1c2241f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java @@ -19,7 +19,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreSupplier; -import org.apache.kafka.streams.state.Serdes; +import org.apache.kafka.streams.state.StateSerdes; /** * An in-memory key-value store that is limited in size and retains a maximum number of most recently used entries. @@ -32,10 +32,10 @@ public class InMemoryLRUCacheStoreSupplier<K, V> implements StateStoreSupplier { private final String name; private final int capacity; - private final Serdes<K, V> serdes; + private final StateSerdes<K, V> serdes; private final Time time; - public InMemoryLRUCacheStoreSupplier(String name, int capacity, Serdes<K, V> serdes, Time time) { + public InMemoryLRUCacheStoreSupplier(String name, int capacity, StateSerdes<K, V> serdes, Time time) { this.name = name; this.capacity = capacity; this.serdes = serdes; http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java index bd03f03..a5aaa06 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java @@ -22,7 +22,7 @@ import org.apache.kafka.streams.processor.StateRestoreCallback; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; -import org.apache.kafka.streams.state.Serdes; +import org.apache.kafka.streams.state.StateSerdes; import java.util.HashSet; import java.util.LinkedHashMap; @@ -43,7 +43,7 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> { protected EldestEntryRemovalListener<K, V> listener; private boolean loggingEnabled = false; - private Serdes<K, V> serdes = null; + private StateSerdes<K, V> serdes = null; // this is used for extended MemoryNavigableLRUCache only public MemoryLRUCache() {} @@ -69,7 +69,7 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> { }; } - public KeyValueStore<K, V> enableLogging(Serdes<K, V> serdes) { + public KeyValueStore<K, V> enableLogging(StateSerdes<K, V> serdes) { this.loggingEnabled = true; this.serdes = serdes; http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java index 3a4c351..ec10c3f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java @@ -20,7 +20,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreSupplier; -import org.apache.kafka.streams.state.Serdes; +import org.apache.kafka.streams.state.StateSerdes; /** * A {@link org.apache.kafka.streams.state.KeyValueStore} that stores all entries in a local RocksDB database. @@ -33,10 +33,10 @@ import org.apache.kafka.streams.state.Serdes; public class RocksDBKeyValueStoreSupplier<K, V> implements StateStoreSupplier { private final String name; - private final Serdes<K, V> serdes; + private final StateSerdes<K, V> serdes; private final Time time; - public RocksDBKeyValueStoreSupplier(String name, Serdes<K, V> serdes, Time time) { + public RocksDBKeyValueStoreSupplier(String name, StateSerdes<K, V> serdes, Time time) { this.name = name; this.serdes = serdes; this.time = time; http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index c295aea..3045856 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -24,7 +24,7 @@ import org.apache.kafka.streams.processor.StateRestoreCallback; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; -import org.apache.kafka.streams.state.Serdes; +import org.apache.kafka.streams.state.StateSerdes; import org.rocksdb.BlockBasedTableConfig; import org.rocksdb.CompactionStyle; @@ -68,7 +68,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> { private final FlushOptions fOptions; private ProcessorContext context; - private Serdes<K, V> serdes; + private StateSerdes<K, V> serdes; protected File dbDir; private RocksDB db; @@ -92,11 +92,11 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> { return this; } - public RocksDBStore(String name, Serdes<K, V> serdes) { + public RocksDBStore(String name, StateSerdes<K, V> serdes) { this(name, DB_FILE_DIR, serdes); } - public RocksDBStore(String name, String parentDir, Serdes<K, V> serdes) { + public RocksDBStore(String name, String parentDir, StateSerdes<K, V> serdes) { this.name = name; this.parentDir = parentDir; this.serdes = serdes; @@ -399,9 +399,9 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> { private static class RocksDbIterator<K, V> implements KeyValueIterator<K, V> { private final RocksIterator iter; - private final Serdes<K, V> serdes; + private final StateSerdes<K, V> serdes; - public RocksDbIterator(RocksIterator iter, Serdes<K, V> serdes) { + public RocksDbIterator(RocksIterator iter, StateSerdes<K, V> serdes) { this.iter = iter; this.serdes = serdes; } @@ -463,7 +463,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> { private final Comparator<byte[]> comparator = new LexicographicComparator(); byte[] rawToKey; - public RocksDBRangeIterator(RocksIterator iter, Serdes<K, V> serdes, + public RocksDBRangeIterator(RocksIterator iter, StateSerdes<K, V> serdes, K from, K to) { super(iter, serdes); iter.seek(serdes.rawKey(from)); http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java index b1605a3..61c2e5e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java @@ -25,7 +25,7 @@ import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateRestoreCallback; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.state.KeyValueIterator; -import org.apache.kafka.streams.state.Serdes; +import org.apache.kafka.streams.state.StateSerdes; import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.streams.state.WindowStoreIterator; import org.apache.kafka.streams.state.WindowStoreUtils; @@ -61,15 +61,15 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> { } private static class RocksDBWindowStoreIterator<V> implements WindowStoreIterator<V> { - private final Serdes<?, V> serdes; + private final StateSerdes<?, V> serdes; private final KeyValueIterator<byte[], byte[]>[] iterators; private int index = 0; - RocksDBWindowStoreIterator(Serdes<?, V> serdes) { + RocksDBWindowStoreIterator(StateSerdes<?, V> serdes) { this(serdes, WindowStoreUtils.NO_ITERATORS); } - RocksDBWindowStoreIterator(Serdes<?, V> serdes, KeyValueIterator<byte[], byte[]>[] iterators) { + RocksDBWindowStoreIterator(StateSerdes<?, V> serdes, KeyValueIterator<byte[], byte[]>[] iterators) { this.serdes = serdes; this.iterators = iterators; } @@ -114,7 +114,7 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> { private final long segmentInterval; private final boolean retainDuplicates; private final Segment[] segments; - private final Serdes<K, V> serdes; + private final StateSerdes<K, V> serdes; private final SimpleDateFormat formatter; private final StoreChangeLogger.ValueGetter<byte[], byte[]> getter; @@ -125,7 +125,7 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> { private boolean loggingEnabled = false; private StoreChangeLogger<byte[], byte[]> changeLogger = null; - public RocksDBWindowStore(String name, long retentionPeriod, int numSegments, boolean retainDuplicates, Serdes<K, V> serdes) { + public RocksDBWindowStore(String name, long retentionPeriod, int numSegments, boolean retainDuplicates, StateSerdes<K, V> serdes) { this.name = name; // The segment interval must be greater than MIN_SEGMENT_INTERVAL
