Repository: kafka Updated Branches: refs/heads/trunk 51c652c40 -> 713a67fdd
http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index ae3808e..cbbe848 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -183,23 +183,27 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V return new KStreamImpl<>(builder, name, sourceNodes, this.repartitionRequired); } - + + @SuppressWarnings("deprecation") @Override public void print() { print(defaultKeyValueMapper, null, null, this.name); } + @SuppressWarnings("deprecation") @Override public void print(final String label) { print(defaultKeyValueMapper, null, null, label); } + @SuppressWarnings("deprecation") @Override public void print(final Serde<K> keySerde, final Serde<V> valSerde) { print(defaultKeyValueMapper, keySerde, valSerde, this.name); } + @SuppressWarnings("deprecation") @Override public void print(final Serde<K> keySerde, final Serde<V> valSerde, @@ -207,17 +211,20 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V print(defaultKeyValueMapper, keySerde, valSerde, label); } + @SuppressWarnings("deprecation") @Override public void print(final KeyValueMapper<? super K, ? super V, String> mapper) { print(mapper, null, null, this.name); } + @SuppressWarnings("deprecation") @Override public void print(final KeyValueMapper<? super K, ? super V, String> mapper, final String label) { print(mapper, null, null, label); } + @SuppressWarnings("deprecation") @Override public void print(final KeyValueMapper<? super K, ? super V, String> mapper, final Serde<K> keySerde, @@ -225,6 +232,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V print(mapper, keySerde, valSerde, this.name); } + @SuppressWarnings("deprecation") @Override public void print(final KeyValueMapper<? super K, ? super V, String> mapper, final Serde<K> keySerde, @@ -243,17 +251,20 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V builder.internalTopologyBuilder.addProcessor(name, printedInternal.build(this.name), this.name); } + @SuppressWarnings("deprecation") @Override public void writeAsText(final String filePath) { writeAsText(filePath, this.name, null, null, defaultKeyValueMapper); } + @SuppressWarnings("deprecation") @Override public void writeAsText(final String filePath, final String label) { writeAsText(filePath, label, null, null, defaultKeyValueMapper); } + @SuppressWarnings("deprecation") @Override public void writeAsText(final String filePath, final Serde<K> keySerde, @@ -261,6 +272,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V writeAsText(filePath, this.name, keySerde, valSerde, defaultKeyValueMapper); } + @SuppressWarnings("deprecation") @Override public void writeAsText(final String filePath, final String label, @@ -269,12 +281,14 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V writeAsText(filePath, label, keySerde, valSerde, defaultKeyValueMapper); } + @SuppressWarnings("deprecation") @Override public void writeAsText(final String filePath, final KeyValueMapper<? super K, ? super V, String> mapper) { writeAsText(filePath, this.name, null, null, mapper); } + @SuppressWarnings("deprecation") @Override public void writeAsText(final String filePath, final String label, @@ -282,6 +296,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V writeAsText(filePath, label, null, null, mapper); } + @SuppressWarnings("deprecation") @Override public void writeAsText(final String filePath, final Serde<K> keySerde, @@ -290,6 +305,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V writeAsText(filePath, this.name, keySerde, valSerde, mapper); } + @SuppressWarnings("deprecation") @Override public void writeAsText(final String filePath, final String label, @@ -368,6 +384,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V return new KStreamImpl<>(builder, name, allSourceNodes, requireRepartitioning); } + @SuppressWarnings("deprecation") @Override public KStream<K, V> through(final Serde<K> keySerde, final Serde<V> valSerde, @@ -404,6 +421,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V return new KStreamImpl<>(builder, name, sourceNodes, repartitionRequired); } + @SuppressWarnings("deprecation") @Override public KStream<K, V> through(final Serde<K> keySerde, final Serde<V> valSerde, @@ -411,6 +429,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V return through(topic, Produced.with(keySerde, valSerde)); } + @SuppressWarnings("deprecation") @Override public KStream<K, V> through(final StreamPartitioner<? super K, ? super V> partitioner, final String topic) { @@ -427,12 +446,14 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V to(topic, Produced.<K, V>with(null, null, null)); } + @SuppressWarnings("deprecation") @Override public void to(final StreamPartitioner<? super K, ? super V> partitioner, final String topic) { to(topic, Produced.streamPartitioner(partitioner)); } + @SuppressWarnings("deprecation") @Override public void to(final Serde<K> keySerde, final Serde<V> valSerde, @@ -440,7 +461,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V to(topic, Produced.with(keySerde, valSerde)); } - @SuppressWarnings("unchecked") + @SuppressWarnings({"unchecked", "deprecation"}) @Override public void to(final Serde<K> keySerde, final Serde<V> valSerde, @@ -459,6 +480,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V } + @SuppressWarnings("unchecked") private void to(final String topic, final ProducedInternal<K, V> produced) { final String name = builder.newName(SINK_NAME); final Serializer<K> keySerializer = produced.keySerde() == null ? null : produced.keySerde().serializer(); @@ -513,6 +535,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V } } + @SuppressWarnings("deprecation") @Override public <V1, R> KStream<K, R> join(final KStream<K, V1> other, final ValueJoiner<? super V, ? super V1, ? extends R> joiner, @@ -540,6 +563,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V new KStreamImplJoin(false, false)); } + @SuppressWarnings("deprecation") @Override public <V1, R> KStream<K, R> outerJoin(final KStream<K, V1> other, final ValueJoiner<? super V, ? super V1, ? extends R> joiner, @@ -641,6 +665,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V return sourceName; } + @SuppressWarnings("deprecation") @Override public <V1, R> KStream<K, R> leftJoin(final KStream<K, V1> other, final ValueJoiner<? super V, ? super V1, ? extends R> joiner, @@ -696,6 +721,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V } } + @SuppressWarnings("deprecation") @Override public <V1, R> KStream<K, R> join(final KTable<K, V1> other, final ValueJoiner<? super V, ? super V1, ? extends R> joiner, @@ -732,6 +758,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V return new KStreamImpl<>(builder, name, sourceNodes, false); } + @SuppressWarnings("unchecked") private <V1, R> KStream<K, R> doStreamTableJoin(final KTable<K, V1> other, final ValueJoiner<? super V, ? super V1, ? extends R> joiner, final boolean leftJoin) { @@ -768,6 +795,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V } } + @SuppressWarnings("deprecation") + @Override public <V1, R> KStream<K, R> leftJoin(final KTable<K, V1> other, final ValueJoiner<? super V, ? super V1, ? extends R> joiner, final Serde<K> keySerde, @@ -795,6 +824,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V true); } + @SuppressWarnings("deprecation") @Override public <K1> KGroupedStream<K1, V> groupBy(final KeyValueMapper<? super K, ? super V, K1> selector, final Serde<K1> keySerde, @@ -820,6 +850,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V } + @SuppressWarnings("deprecation") @Override public KGroupedStream<K, V> groupByKey(final Serde<K> keySerde, final Serde<V> valSerde) { @@ -849,6 +880,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V this.rightOuter = rightOuter; } + @SuppressWarnings("unchecked") public <K1, R, V1, V2> KStream<K1, R> join(final KStream<K1, V1> lhs, final KStream<K1, V2> other, final ValueJoiner<? super V1, ? super V2, ? extends R> joiner, http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java index b322415..4f26767 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java @@ -51,6 +51,7 @@ class KStreamKStreamJoin<K, R, V1, V2> implements ProcessorSupplier<K, V1> { private WindowStore<K, V2> otherWindow; + @SuppressWarnings("unchecked") @Override public void init(ProcessorContext context) { super.init(context); http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index a42db0b..db8de1a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -33,7 +33,6 @@ import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.kstream.ValueMapper; import org.apache.kafka.streams.processor.FailOnInvalidTimestamp; import org.apache.kafka.streams.processor.ProcessorSupplier; -import org.apache.kafka.streams.processor.StateStoreSupplier; import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.StoreBuilder; @@ -142,8 +141,9 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, return this.queryableStoreName; } + @SuppressWarnings("deprecation") private KTable<K, V> doFilter(final Predicate<? super K, ? super V> predicate, - final StateStoreSupplier<KeyValueStore> storeSupplier, + final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier, final boolean isFilterNot) { Objects.requireNonNull(predicate, "predicate can't be null"); String name = builder.newName(FILTER_NAME); @@ -196,19 +196,21 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, return doFilter(predicate, new MaterializedInternal<>(materialized), false); } + @SuppressWarnings("deprecation") @Override public KTable<K, V> filter(final Predicate<? super K, ? super V> predicate, final String queryableStoreName) { - StateStoreSupplier<KeyValueStore> storeSupplier = null; + org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier = null; if (queryableStoreName != null) { storeSupplier = keyValueStore(this.keySerde, this.valSerde, queryableStoreName); } return doFilter(predicate, storeSupplier, false); } + @SuppressWarnings("deprecation") @Override public KTable<K, V> filter(final Predicate<? super K, ? super V> predicate, - final StateStoreSupplier<KeyValueStore> storeSupplier) { + final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) { Objects.requireNonNull(storeSupplier, "storeSupplier can't be null"); return doFilter(predicate, storeSupplier, false); } @@ -226,26 +228,29 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, return doFilter(predicate, new MaterializedInternal<>(materialized), true); } + @SuppressWarnings("deprecation") @Override public KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate, final String queryableStoreName) { - StateStoreSupplier<KeyValueStore> storeSupplier = null; + org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier = null; if (queryableStoreName != null) { storeSupplier = keyValueStore(this.keySerde, this.valSerde, queryableStoreName); } return doFilter(predicate, storeSupplier, true); } + @SuppressWarnings("deprecation") @Override public KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate, - final StateStoreSupplier<KeyValueStore> storeSupplier) { + final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) { Objects.requireNonNull(storeSupplier, "storeSupplier can't be null"); return doFilter(predicate, storeSupplier, true); } + @SuppressWarnings("deprecation") private <V1> KTable<K, V1> doMapValues(final ValueMapper<? super V, ? extends V1> mapper, final Serde<V1> valueSerde, - final StateStoreSupplier<KeyValueStore> storeSupplier) { + final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) { Objects.requireNonNull(mapper); String name = builder.newName(MAPVALUES_NAME); String internalStoreName = null; @@ -284,21 +289,23 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, return new KTableImpl<>(builder, name, processorSupplier, sourceNodes, this.queryableStoreName, true); } + @SuppressWarnings("deprecation") @Override public <V1> KTable<K, V1> mapValues(final ValueMapper<? super V, ? extends V1> mapper, final Serde<V1> valueSerde, final String queryableStoreName) { - StateStoreSupplier<KeyValueStore> storeSupplier = null; + org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier = null; if (queryableStoreName != null) { storeSupplier = keyValueStore(this.keySerde, valueSerde, queryableStoreName); } return doMapValues(mapper, valueSerde, storeSupplier); } + @SuppressWarnings("deprecation") @Override public <V1> KTable<K, V1> mapValues(final ValueMapper<? super V, ? extends V1> mapper, final Serde<V1> valueSerde, - final StateStoreSupplier<KeyValueStore> storeSupplier) { + final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) { Objects.requireNonNull(storeSupplier, "storeSupplier can't be null"); return doMapValues(mapper, valueSerde, storeSupplier); } @@ -322,7 +329,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, print(keySerde, valSerde, this.name); } - @SuppressWarnings("deprecation") + @SuppressWarnings({"unchecked", "deprecation"}) @Override public void print(final Serde<K> keySerde, final Serde<V> valSerde, @@ -356,7 +363,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, /** * @throws TopologyException if file is not found */ - @SuppressWarnings("deprecation") + @SuppressWarnings({"unchecked", "deprecation"}) @Override public void writeAsText(final String filePath, final String label, @@ -390,6 +397,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, builder.internalTopologyBuilder.addProcessor(name, processorSupplier, this.name); } + @SuppressWarnings("deprecation") @Override public KTable<K, V> through(final Serde<K> keySerde, final Serde<V> valSerde, @@ -408,12 +416,13 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, queryableStoreName != null)); } + @SuppressWarnings("deprecation") @Override public KTable<K, V> through(final Serde<K> keySerde, final Serde<V> valSerde, final StreamPartitioner<? super K, ? super V> partitioner, final String topic, - final StateStoreSupplier<KeyValueStore> storeSupplier) { + final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) { Objects.requireNonNull(storeSupplier, "storeSupplier can't be null"); to(keySerde, valSerde, partitioner, topic); @@ -421,6 +430,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, return builder.table(topic, consumed, storeSupplier); } + @SuppressWarnings("deprecation") @Override public KTable<K, V> through(final Serde<K> keySerde, final Serde<V> valSerde, @@ -428,6 +438,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, final String topic) { return through(keySerde, valSerde, partitioner, topic, (String) null); } + + @SuppressWarnings("deprecation") @Override public KTable<K, V> through(final Serde<K> keySerde, final Serde<V> valSerde, @@ -436,15 +448,17 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, return through(keySerde, valSerde, null, topic, queryableStoreName); } + @SuppressWarnings("deprecation") @Override public KTable<K, V> through(final Serde<K> keySerde, final Serde<V> valSerde, final String topic, - final StateStoreSupplier<KeyValueStore> storeSupplier) { + final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) { Objects.requireNonNull(storeSupplier, "storeSupplier can't be null"); return through(keySerde, valSerde, null, topic, storeSupplier); } + @SuppressWarnings("deprecation") @Override public KTable<K, V> through(final Serde<K> keySerde, final Serde<V> valSerde, @@ -452,6 +466,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, return through(keySerde, valSerde, null, topic, (String) null); } + @SuppressWarnings("deprecation") @Override public KTable<K, V> through(final StreamPartitioner<? super K, ? super V> partitioner, final String topic, @@ -459,49 +474,57 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, return through(null, null, partitioner, topic, queryableStoreName); } + @SuppressWarnings("deprecation") @Override public KTable<K, V> through(final StreamPartitioner<? super K, ? super V> partitioner, final String topic, - final StateStoreSupplier<KeyValueStore> storeSupplier) { + final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) { Objects.requireNonNull(storeSupplier, "storeSupplier can't be null"); return through(null, null, partitioner, topic, storeSupplier); } + @SuppressWarnings("deprecation") @Override public KTable<K, V> through(final StreamPartitioner<? super K, ? super V> partitioner, final String topic) { return through(null, null, partitioner, topic, (String) null); } + @SuppressWarnings("deprecation") @Override public KTable<K, V> through(final String topic, final String queryableStoreName) { return through(null, null, null, topic, queryableStoreName); } + @SuppressWarnings("deprecation") @Override public KTable<K, V> through(final String topic, - final StateStoreSupplier<KeyValueStore> storeSupplier) { + final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) { Objects.requireNonNull(storeSupplier, "storeSupplier can't be null"); return through(null, null, null, topic, storeSupplier); } + @SuppressWarnings("deprecation") @Override public KTable<K, V> through(final String topic) { return through(null, null, null, topic, (String) null); } + @SuppressWarnings("deprecation") @Override public void to(final String topic) { to(null, null, null, topic); } + @SuppressWarnings("deprecation") @Override public void to(final StreamPartitioner<? super K, ? super V> partitioner, final String topic) { to(null, null, partitioner, topic); } + @SuppressWarnings("deprecation") @Override public void to(final Serde<K> keySerde, final Serde<V> valSerde, @@ -509,6 +532,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, this.toStream().to(keySerde, valSerde, null, topic); } + @SuppressWarnings("deprecation") @Override public void to(final Serde<K> keySerde, final Serde<V> valSerde, @@ -552,6 +576,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, return doJoin(other, joiner, new MaterializedInternal<>(materialized), false, false); } + @SuppressWarnings("deprecation") @Override public <V1, R> KTable<K, R> join(final KTable<K, V1> other, final ValueJoiner<? super V, ? super V1, ? extends R> joiner, @@ -560,10 +585,11 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, return doJoin(other, joiner, false, false, joinSerde, queryableStoreName); } + @SuppressWarnings("deprecation") @Override public <V1, R> KTable<K, R> join(final KTable<K, V1> other, final ValueJoiner<? super V, ? super V1, ? extends R> joiner, - final StateStoreSupplier<KeyValueStore> storeSupplier) { + final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) { Objects.requireNonNull(storeSupplier, "storeSupplier can't be null"); return doJoin(other, joiner, false, false, storeSupplier); } @@ -581,6 +607,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, return doJoin(other, joiner, new MaterializedInternal<>(materialized), true, true); } + @SuppressWarnings("deprecation") @Override public <V1, R> KTable<K, R> outerJoin(final KTable<K, V1> other, final ValueJoiner<? super V, ? super V1, ? extends R> joiner, @@ -589,10 +616,11 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, return doJoin(other, joiner, true, true, joinSerde, queryableStoreName); } + @SuppressWarnings("deprecation") @Override public <V1, R> KTable<K, R> outerJoin(final KTable<K, V1> other, final ValueJoiner<? super V, ? super V1, ? extends R> joiner, - final StateStoreSupplier<KeyValueStore> storeSupplier) { + final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) { Objects.requireNonNull(storeSupplier, "storeSupplier can't be null"); return doJoin(other, joiner, true, true, storeSupplier); } @@ -614,6 +642,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, false); } + @SuppressWarnings("deprecation") @Override public <V1, R> KTable<K, R> leftJoin(final KTable<K, V1> other, final ValueJoiner<? super V, ? super V1, ? extends R> joiner, @@ -622,15 +651,16 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, return doJoin(other, joiner, true, false, joinSerde, queryableStoreName); } + @SuppressWarnings("deprecation") @Override public <V1, R> KTable<K, R> leftJoin(final KTable<K, V1> other, final ValueJoiner<? super V, ? super V1, ? extends R> joiner, - final StateStoreSupplier<KeyValueStore> storeSupplier) { + final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) { Objects.requireNonNull(storeSupplier, "storeSupplier can't be null"); return doJoin(other, joiner, true, false, storeSupplier); } - @SuppressWarnings("unchecked") + @SuppressWarnings({"unchecked", "deprecation"}) private <V1, R> KTable<K, R> doJoin(final KTable<K, V1> other, final ValueJoiner<? super V, ? super V1, ? extends R> joiner, final boolean leftOuter, @@ -640,16 +670,18 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, Objects.requireNonNull(other, "other can't be null"); Objects.requireNonNull(joiner, "joiner can't be null"); - final StateStoreSupplier storeSupplier = queryableStoreName == null ? null : keyValueStore(this.keySerde, joinSerde, queryableStoreName); + final org.apache.kafka.streams.processor.StateStoreSupplier storeSupplier + = queryableStoreName == null ? null : keyValueStore(this.keySerde, joinSerde, queryableStoreName); return doJoin(other, joiner, leftOuter, rightOuter, storeSupplier); } + @SuppressWarnings({"unchecked", "deprecation"}) private <V1, R> KTable<K, R> doJoin(final KTable<K, V1> other, final ValueJoiner<? super V, ? super V1, ? extends R> joiner, final boolean leftOuter, final boolean rightOuter, - final StateStoreSupplier<KeyValueStore> storeSupplier) { + final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) { Objects.requireNonNull(other, "other can't be null"); Objects.requireNonNull(joiner, "joiner can't be null"); final String joinMergeName = builder.newName(MERGE_NAME); @@ -668,6 +700,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, return result; } + @SuppressWarnings("unchecked") private <VO, VR> KTable<K, VR> doJoin(final KTable<K, VO> other, final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materialized, @@ -692,6 +725,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, return result; } + @SuppressWarnings("unchecked") private <V1, R> KTable<K, R> buildJoin(final AbstractStream<K> other, final ValueJoiner<? super V, ? super V1, ? extends R> joiner, final boolean leftOuter, @@ -740,6 +774,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, return new KTableImpl<>(builder, joinMergeName, joinMerge, allSourceNodes, internalQueryableName, internalQueryableName != null); } + @SuppressWarnings("deprecation") @Override public <K1, V1> KGroupedTable<K1, V1> groupBy(final KeyValueMapper<? super K, ? super V, KeyValue<K1, V1>> selector, final Serde<K1> keySerde, http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedSerializer.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedSerializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedSerializer.java index ffb3697..b4e5d44 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedSerializer.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedSerializer.java @@ -43,6 +43,7 @@ public class WindowedSerializer<T> implements Serializer<Windowed<T>> { // Default constructor needed by Kafka public WindowedSerializer() {} + @SuppressWarnings("unchecked") @Override public void configure(Map<String, ?> configs, boolean isKey) { if (inner == null) { @@ -76,12 +77,12 @@ public class WindowedSerializer<T> implements Serializer<Windowed<T>> { inner.close(); } - public byte[] serializeBaseKey(String topic, Windowed<T> data) { + byte[] serializeBaseKey(String topic, Windowed<T> data) { return inner.serialize(topic, data.key()); } // Only for testing - public Serializer<T> innerSerializer() { + Serializer<T> innerSerializer() { return inner; } } http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java index ad0f236..66dfa27 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java @@ -50,6 +50,7 @@ import java.util.regex.Pattern; * * @deprecated use {@link Topology} instead */ +@SuppressWarnings("unchecked") @Deprecated public class TopologyBuilder { http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java index c24686e..52465ed 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java @@ -46,7 +46,7 @@ public abstract class AbstractTask implements Task { final ProcessorTopology topology; final ProcessorStateManager stateMgr; final Set<TopicPartition> partitions; - final Consumer consumer; + final Consumer<byte[], byte[]> consumer; final String logPrefix; final boolean eosEnabled; final Logger log; http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java index 06405ef..f2cbf51 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java @@ -23,7 +23,6 @@ import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.errors.TopologyException; import org.apache.kafka.streams.processor.ProcessorSupplier; import org.apache.kafka.streams.processor.StateStore; -import org.apache.kafka.streams.processor.StateStoreSupplier; import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.streams.processor.TimestampExtractor; import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.SubscriptionUpdates; @@ -179,10 +178,10 @@ public class InternalTopologyBuilder { private static class StateStoreSupplierFactory extends AbstractStateStoreFactory { @SuppressWarnings("deprecation") - private final StateStoreSupplier supplier; + private final org.apache.kafka.streams.processor.StateStoreSupplier supplier; @SuppressWarnings("deprecation") - StateStoreSupplierFactory(final StateStoreSupplier<?> supplier) { + StateStoreSupplierFactory(final org.apache.kafka.streams.processor.StateStoreSupplier<?> supplier) { super(supplier.name(), supplier.loggingEnabled(), supplier instanceof WindowStoreSupplier, @@ -196,6 +195,7 @@ public class InternalTopologyBuilder { return supplier.get(); } + @SuppressWarnings("deprecation") @Override public long retentionPeriod() { if (!isWindowStore()) { @@ -498,7 +498,7 @@ public class InternalTopologyBuilder { } @SuppressWarnings("deprecation") - public final void addStateStore(final StateStoreSupplier supplier, + public final void addStateStore(final org.apache.kafka.streams.processor.StateStoreSupplier supplier, final String... processorNames) { Objects.requireNonNull(supplier, "supplier can't be null"); if (stateFactories.containsKey(supplier.name())) { @@ -531,7 +531,7 @@ public class InternalTopologyBuilder { } @SuppressWarnings("deprecation") - public final void addGlobalStore(final StateStoreSupplier<KeyValueStore> storeSupplier, + public final void addGlobalStore(final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier, final String sourceName, final TimestampExtractor timestampExtractor, final Deserializer keyDeserializer, @@ -925,6 +925,7 @@ public class InternalTopologyBuilder { return new ProcessorTopology(processorNodes, topicSourceMap, topicSinkMap, new ArrayList<>(stateStoreMap.values()), storeToChangelogTopic, new ArrayList<>(globalStateStores.values())); } + @SuppressWarnings("unchecked") private void buildSinkNode(final Map<String, ProcessorNode> processorMap, final Map<String, SinkNode> topicSinkMap, final SinkNodeFactory sinkNodeFactory, http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/processor/internals/QuickUnion.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/QuickUnion.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/QuickUnion.java index b395d42..47cd61d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/QuickUnion.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/QuickUnion.java @@ -52,6 +52,7 @@ public class QuickUnion<T> { return current; } + @SuppressWarnings("unchecked") public void unite(T id1, T... idList) { for (T id2 : idList) { unitePair(id1, id2); http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java index b254bb8..03bbceb 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java @@ -112,7 +112,7 @@ public class StreamsMetricsImpl implements StreamsMetrics { private Map<String, String> constructTags(final String scopeName, final String entityName, final String... tags) { - List<String> updatedTagList = new ArrayList(Arrays.asList(tags)); + List<String> updatedTagList = new ArrayList<>(Arrays.asList(tags)); updatedTagList.add(scopeName + "-id"); updatedTagList.add(entityName); return tagMap(updatedTagList.toArray(new String[updatedTagList.size()])); http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/state/Stores.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java index 0173c1d..c9c44af 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java @@ -21,7 +21,6 @@ import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Time; -import org.apache.kafka.streams.processor.StateStoreSupplier; import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore; import org.apache.kafka.streams.state.internals.InMemoryKeyValueStoreSupplier; import org.apache.kafka.streams.state.internals.InMemoryLRUCacheStoreSupplier; @@ -271,7 +270,7 @@ public class Stores { } @Override - public StateStoreSupplier build() { + public org.apache.kafka.streams.processor.StateStoreSupplier build() { log.trace("Defining InMemory Store name={} capacity={} logged={}", name, capacity, logged); if (capacity < Integer.MAX_VALUE) { return new InMemoryLRUCacheStoreSupplier<>(name, capacity, keySerde, valueSerde, logged, logConfig); @@ -335,7 +334,7 @@ public class Stores { } @Override - public StateStoreSupplier build() { + public org.apache.kafka.streams.processor.StateStoreSupplier build() { log.trace("Defining RocksDb Store name={} numSegments={} logged={}", name, numSegments, logged); if (sessionWindows) { return new RocksDBSessionStoreSupplier<>(name, retentionPeriod, keySerde, valueSerde, logged, logConfig, cachingEnabled); @@ -535,6 +534,7 @@ public class Stores { * @param <K> the type of keys * @param <V> the type of values */ + @Deprecated public interface InMemoryKeyValueFactory<K, V> { /** * Limits the in-memory key-value store to hold a maximum number of entries. The default is {@link Integer#MAX_VALUE}, which is @@ -567,7 +567,7 @@ public class Stores { * Return the instance of StateStoreSupplier of new key-value store. * @return the state store supplier; never null */ - StateStoreSupplier build(); + org.apache.kafka.streams.processor.StateStoreSupplier build(); } /** @@ -576,6 +576,7 @@ public class Stores { * @param <K> the type of keys * @param <V> the type of values */ + @Deprecated public interface PersistentKeyValueFactory<K, V> { /** @@ -614,11 +615,12 @@ public class Stores { * @return the factory to create a persistent key-value store */ PersistentKeyValueFactory<K, V> enableCaching(); + /** * Return the instance of StateStoreSupplier of new key-value store. * @return the key-value store; never null */ - StateStoreSupplier build(); + org.apache.kafka.streams.processor.StateStoreSupplier build(); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreSupplier.java index 92e8ce0..10d0fe2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreSupplier.java @@ -19,12 +19,11 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.processor.StateStore; -import org.apache.kafka.streams.processor.StateStoreSupplier; import java.util.Map; - -abstract class AbstractStoreSupplier<K, V, T extends StateStore> implements StateStoreSupplier<T> { +@Deprecated +abstract class AbstractStoreSupplier<K, V, T extends StateStore> implements org.apache.kafka.streams.processor.StateStoreSupplier<T> { protected final String name; protected final Serde<K> keySerde; protected final Serde<V> valueSerde; http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java index 6d1e6dd..f955421 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java @@ -34,6 +34,7 @@ import java.util.Map; * * @see org.apache.kafka.streams.state.Stores#create(String) */ +@Deprecated public class InMemoryKeyValueStoreSupplier<K, V> extends AbstractStoreSupplier<K, V, KeyValueStore> { public InMemoryKeyValueStoreSupplier(String name, Serde<K> keySerde, Serde<V> valueSerde, boolean logged, Map<String, String> logConfig) { http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java index c93bacb..0f897ba 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java @@ -29,6 +29,7 @@ import java.util.Map; * @param <V> The value type * */ +@Deprecated public class InMemoryLRUCacheStoreSupplier<K, V> extends AbstractStoreSupplier<K, V, KeyValueStore> { private final int capacity; http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueBytesStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueBytesStore.java index 7ac8bab..a6ff8d5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueBytesStore.java @@ -95,6 +95,7 @@ public class MeteredKeyValueBytesStore<K, V> extends WrappedStateStore.AbstractS }, time); } + @SuppressWarnings("unchecked") @Override public void init(ProcessorContext context, StateStore root) { this.serdes = new StateSerdes<>(ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()), http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java index 8c10987..8d9065c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java @@ -57,6 +57,7 @@ public class MeteredSessionStore<K, V> extends WrappedStateStore.AbstractStateSt this.time = time; } + @SuppressWarnings("unchecked") @Override public void init(final ProcessorContext context, final StateStore root) { //noinspection unchecked http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java index 49d8050..20c7c43 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java @@ -57,7 +57,7 @@ public class MeteredWindowStore<K, V> extends WrappedStateStore.AbstractStateSto this.valueSerde = valueSerde; } - + @SuppressWarnings("unchecked") @Override public void init(final ProcessorContext context, final StateStore root) { this.context = context; http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java index d629c1c..4b233f0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java @@ -29,7 +29,7 @@ import java.util.Map; * @param <V> the type of values * @see org.apache.kafka.streams.state.Stores#create(String) */ - +@Deprecated public class RocksDBKeyValueStoreSupplier<K, V> extends AbstractStoreSupplier<K, V, KeyValueStore> { private final KeyValueStoreBuilder<K, V> builder; http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplier.java index f5432dc..1552f7d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplier.java @@ -30,6 +30,7 @@ import java.util.Map; * * @see org.apache.kafka.streams.state.Stores#create(String) */ +@Deprecated public class RocksDBSessionStoreSupplier<K, V> extends AbstractStoreSupplier<K, V, SessionStore> implements WindowStoreSupplier<SessionStore> { static final int NUM_SEGMENTS = 3; http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java index b899f5e..2a82f79 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java @@ -30,7 +30,7 @@ import java.util.Map; * * @see org.apache.kafka.streams.state.Stores#create(String) */ - +@Deprecated public class RocksDBWindowStoreSupplier<K, V> extends AbstractStoreSupplier<K, V, WindowStore> implements WindowStoreSupplier<WindowStore> { public static final int MIN_SEGMENTS = 2; private final long retentionPeriod; http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java index ffeb7d8..b9b7181 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java @@ -21,8 +21,6 @@ import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.state.SessionBytesStoreSupplier; import org.apache.kafka.streams.state.SessionStore; -import static org.apache.kafka.streams.state.internals.RocksDBSessionStoreSupplier.NUM_SEGMENTS; - public class RocksDbSessionBytesStoreSupplier implements SessionBytesStoreSupplier { private final String name; private final long retentionPeriod; @@ -38,12 +36,14 @@ public class RocksDbSessionBytesStoreSupplier implements SessionBytesStoreSuppli return name; } + @SuppressWarnings("deprecation") @Override public SessionStore<Bytes, byte[]> get() { - final RocksDBSegmentedBytesStore segmented = new RocksDBSegmentedBytesStore(name, - retentionPeriod, - NUM_SEGMENTS, - new SessionKeySchema()); + final RocksDBSegmentedBytesStore segmented = new RocksDBSegmentedBytesStore( + name, + retentionPeriod, + org.apache.kafka.streams.state.internals.RocksDBSessionStoreSupplier.NUM_SEGMENTS, + new SessionKeySchema()); return new RocksDBSessionStore<>(segmented, Serdes.Bytes(), Serdes.ByteArray()); } @@ -52,8 +52,11 @@ public class RocksDbSessionBytesStoreSupplier implements SessionBytesStoreSuppli return "rocksdb-session"; } + @SuppressWarnings("deprecation") @Override public long segmentIntervalMs() { - return Segments.segmentInterval(retentionPeriod, NUM_SEGMENTS); + return Segments.segmentInterval( + retentionPeriod, + org.apache.kafka.streams.state.internals.RocksDBSessionStoreSupplier.NUM_SEGMENTS); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java index a0500b6..e873435 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java @@ -20,8 +20,6 @@ import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.state.WindowBytesStoreSupplier; import org.apache.kafka.streams.state.WindowStore; -import static org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier.MIN_SEGMENTS; - public class RocksDbWindowBytesStoreSupplier implements WindowBytesStoreSupplier { private final String name; private final long retentionPeriod; @@ -29,13 +27,14 @@ public class RocksDbWindowBytesStoreSupplier implements WindowBytesStoreSupplier private final long windowSize; private final boolean retainDuplicates; + @SuppressWarnings("deprecation") public RocksDbWindowBytesStoreSupplier(final String name, final long retentionPeriod, final int segments, final long windowSize, final boolean retainDuplicates) { - if (segments < MIN_SEGMENTS) { - throw new IllegalArgumentException("numSegments must be >= " + MIN_SEGMENTS); + if (segments < org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier.MIN_SEGMENTS) { + throw new IllegalArgumentException("numSegments must be >= " + org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier.MIN_SEGMENTS); } this.name = name; this.retentionPeriod = retentionPeriod; http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreSupplier.java index ad24c25..3495352 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreSupplier.java @@ -17,14 +17,14 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.streams.processor.StateStore; -import org.apache.kafka.streams.processor.StateStoreSupplier; /** - * A windowed state store supplier that extends the {@link StateStoreSupplier} interface. + * A windowed state store supplier that extends the {@link org.apache.kafka.streams.processor.StateStoreSupplier} interface. * * @param <T> State store type */ -public interface WindowStoreSupplier<T extends StateStore> extends StateStoreSupplier<T> { +@Deprecated +public interface WindowStoreSupplier<T extends StateStore> extends org.apache.kafka.streams.processor.StateStoreSupplier<T> { // window retention period in milli-second long retentionPeriod(); http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java b/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java index a4b3118..100a11c 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java +++ b/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java @@ -27,7 +27,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Map; -@SuppressWarnings("deprecation") +@Deprecated public class MockStateStoreSupplier implements StateStoreSupplier { private String name; private boolean persistent;
