This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9774635bfdc MINOR: update Kafka Streams `Topology` JavaDocs (#18778)
9774635bfdc is described below

commit 9774635bfdc22a490c09148b88fcd33c6597f90b
Author: Matthias J. Sax <[email protected]>
AuthorDate: Wed Feb 5 20:24:14 2025 -0800

    MINOR: update Kafka Streams `Topology` JavaDocs (#18778)
    
    Reviewers: Bill Bejeck <[email protected]>
---
 .../java/org/apache/kafka/streams/Topology.java    | 1095 ++++++++------------
 .../apache/kafka/streams/internals/ApiUtils.java   |    9 +-
 .../streams/kstream/internals/KStreamImpl.java     |    6 +-
 .../kstream/internals/graph/TableSourceNode.java   |    5 +-
 .../internals/InternalTopologyBuilder.java         |  246 ++---
 .../streams/processor/internals/SinkNode.java      |    4 +-
 .../org/apache/kafka/streams/TopologyTest.java     |    2 -
 .../streams/kstream/internals/KStreamImplTest.java |   26 +-
 .../internals/InternalTopologyBuilderTest.java     |    6 +-
 9 files changed, 555 insertions(+), 844 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/Topology.java 
b/streams/src/main/java/org/apache/kafka/streams/Topology.java
index 9a25afd4350..01abf4a8b68 100644
--- a/streams/src/main/java/org/apache/kafka/streams/Topology.java
+++ b/streams/src/main/java/org/apache/kafka/streams/Topology.java
@@ -30,26 +30,24 @@ import 
org.apache.kafka.streams.processor.TopicNameExtractor;
 import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.ProcessorSupplier;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
-import org.apache.kafka.streams.processor.internals.ProcessorNode;
-import org.apache.kafka.streams.processor.internals.ProcessorTopology;
-import org.apache.kafka.streams.processor.internals.SinkNode;
-import org.apache.kafka.streams.processor.internals.SourceNode;
 import 
org.apache.kafka.streams.processor.internals.StoreDelegatingProcessorSupplier;
+import org.apache.kafka.streams.query.StateQueryRequest;
 import org.apache.kafka.streams.state.StoreBuilder;
 
+import java.util.Objects;
 import java.util.Set;
 import java.util.regex.Pattern;
 
 /**
- * A logical representation of a {@link ProcessorTopology}.
- * A topology is an acyclic graph of sources, processors, and sinks.
- * A {@link SourceNode source} is a node in the graph that consumes one or 
more Kafka topics and forwards them to its
+ * A logical representation of a {@code ProcessorTopology}.
+ * A topology is a graph of sources, processors, and sinks.
+ * A {@code SourceNode} is a node in the graph that consumes one or more Kafka 
topics and forwards them to its
  * successor nodes.
- * A {@link Processor processor} is a node in the graph that receives input 
records from upstream nodes, processes the
- * records, and optionally forwarding new records to one or all of its 
downstream nodes.
- * Finally, a {@link SinkNode sink} is a node in the graph that receives 
records from upstream nodes and writes them to
+ * A {@link Processor} is a node in the graph that receives input records from 
upstream nodes, processes the
+ * records, and optionally forwarding new records to one, multiple, or all of 
its downstream nodes.
+ * Finally, a {@code SinkNode} is a node in the graph that receives records 
from upstream nodes and writes them to
  * a Kafka topic.
- * A {@code Topology} allows you to construct an acyclic graph of these nodes, 
and then passed into a new
+ * A {@code Topology} allows you to construct a graph of these nodes, and then 
passed into a new
  * {@link KafkaStreams} instance that will then {@link KafkaStreams#start() 
begin consuming, processing, and producing
  * records}.
  */
@@ -95,17 +93,42 @@ public class Topology {
     }
 
     /**
-     * Add a new source that consumes the named topics and forward the records 
to child processor and/or sink nodes.
-     * The source will use the {@link 
StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key deserializer} and
-     * {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value 
deserializer} specified in the
-     * {@link StreamsConfig stream configuration}.
-     * The default {@link TimestampExtractor} as specified in the {@link 
StreamsConfig config} is used.
+     * Add a source that consumes the named topics and forwards the records to 
child
+     * {@link #addProcessor(String, ProcessorSupplier, String...) processors} 
and
+     * {@link #addSink(String, String, String...) sinks}.
+     *
+     * <p>The source will use the default values from {@link StreamsConfig} for
+     * <ul>
+     *   <li>{@link StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG key 
deserializer}</li>
+     *   <li>{@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG value 
deserializer}</li>
+     *   <li>{@link 
org.apache.kafka.clients.consumer.ConsumerConfig#AUTO_OFFSET_RESET_CONFIG 
auto.offset.reset}</li>
+     *   <li>{@link StreamsConfig#DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG 
timestamp extractor}</li>
+     * </ul>
+     *
+     * If you want to specify a source specific {@link 
org.apache.kafka.streams.AutoOffsetReset auto.offset.reset
+     * strategy}, {@link TimestampExtractor}, or key/value {@link 
Deserializer}, use the corresponding overloaded
+     * {@code addSource(...)} method.
+     *
+     * @param name
+     *        the unique name of the source used to reference this node when 
adding
+     *        {@link #addProcessor(String, ProcessorSupplier, String...) 
processor} or
+     *        {@link #addSink(String, String, String...) sink} children
+     * @param topics
+     *        the name of one or more Kafka topics that this source is to 
consume
      *
-     * @param name the unique name of the source used to reference this node 
when
-     * {@link #addProcessor(String, ProcessorSupplier, String...) adding 
processor children}.
-     * @param topics the name of one or more Kafka topics that this source is 
to consume
      * @return itself
-     * @throws TopologyException if processor is already added or if topics 
have already been registered by another source
+     *
+     * @throws TopologyException
+     *         if the provided source name is not unique,
+     *         no topics are specified, or
+     *         a topic has already been registered by another source,
+     *         {@link #addReadOnlyStateStore(StoreBuilder, String, 
Deserializer, Deserializer, String, String, ProcessorSupplier) read-only state 
store}, or
+     *         {@link #addGlobalStore(StoreBuilder, String, Deserializer, 
Deserializer, String, String, ProcessorSupplier) global state store}
+     * @throws NullPointerException
+     *         if {@code name} or {@code topics} is {@code null}, or
+     *         {@code topics} contains a {@code null} topic
+     *
+     * @see #addSource(String, Pattern)
      */
     public synchronized Topology addSource(final String name,
                                            final String... topics) {
@@ -114,18 +137,9 @@ public class Topology {
     }
 
     /**
-     * Add a new source that consumes from topics matching the given pattern
-     * and forward the records to child processor and/or sink nodes.
-     * The source will use the {@link 
StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key deserializer} and
-     * {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value 
deserializer} specified in the
-     * {@link StreamsConfig stream configuration}.
-     * The default {@link TimestampExtractor} as specified in the {@link 
StreamsConfig config} is used.
+     * See {@link #addSource(String, String...)}.
      *
-     * @param name the unique name of the source used to reference this node 
when
-     * {@link #addProcessor(String, ProcessorSupplier, String...) adding 
processor children}.
-     * @param topicPattern regular expression pattern to match Kafka topics 
that this source is to consume
-     * @return itself
-     * @throws TopologyException if processor is already added or if topics 
have already been registered by another source
+     * <p>Takes a {@link Pattern} (cannot be {@code null}) to match topics to 
consumes from, instead of a list of topic names.
      */
     public synchronized Topology addSource(final String name,
                                            final Pattern topicPattern) {
@@ -134,18 +148,6 @@ public class Topology {
     }
 
     /**
-     * Add a new source that consumes the named topics and forward the records 
to child processor and/or sink nodes.
-     * The source will use the {@link 
StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key deserializer} and
-     * {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value 
deserializer} specified in the
-     * {@link StreamsConfig stream configuration}.
-     * The default {@link TimestampExtractor} as specified in the {@link 
StreamsConfig config} is used.
-     *
-     * @param offsetReset the auto offset reset policy to use for this source 
if no committed offsets found; acceptable values earliest or latest
-     * @param name the unique name of the source used to reference this node 
when
-     * {@link #addProcessor(String, ProcessorSupplier, String...) adding 
processor children}.
-     * @param topics the name of one or more Kafka topics that this source is 
to consume
-     * @return itself
-     * @throws TopologyException if processor is already added or if topics 
have already been registered by another source
      * @deprecated Since 4.0. Use {@link 
#addSource(org.apache.kafka.streams.AutoOffsetReset, String, String...)} 
instead.
      */
     @Deprecated
@@ -157,14 +159,7 @@ public class Topology {
     }
 
     /**
-     * Adds a new source that consumes the specified topics and forwards the 
records to child processor and/or sink nodes.
-     * The source will use the specified {@link 
org.apache.kafka.streams.AutoOffsetReset offset reset policy} if no committed 
offsets are found.
-     *
-     * @param offsetReset the auto offset reset policy to use for this source 
if no committed offsets are found
-     * @param name the unique name of the source used to reference this node 
when {@link #addProcessor(String, ProcessorSupplier, String...) adding 
processor children}
-     * @param topics the name of one or more Kafka topics that this source is 
to consume
-     * @return itself
-     * @throws TopologyException if a processor is already added or if topics 
have already been registered by another source
+     * See {@link #addSource(String, String...)}.
      */
     public synchronized Topology addSource(final 
org.apache.kafka.streams.AutoOffsetReset offsetReset,
                                            final String name,
@@ -181,19 +176,6 @@ public class Topology {
     }
 
     /**
-     * Add a new source that consumes from topics matching the given pattern
-     * and forward the records to child processor and/or sink nodes.
-     * The source will use the {@link 
StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key deserializer} and
-     * {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value 
deserializer} specified in the
-     * {@link StreamsConfig stream configuration}.
-     * The default {@link TimestampExtractor} as specified in the {@link 
StreamsConfig config} is used.
-     *
-     * @param offsetReset the auto offset reset policy value for this source 
if no committed offsets found; acceptable values earliest or latest.
-     * @param name the unique name of the source used to reference this node 
when
-     * {@link #addProcessor(String, ProcessorSupplier, String...) adding 
processor children}.
-     * @param topicPattern regular expression pattern to match Kafka topics 
that this source is to consume
-     * @return itself
-     * @throws TopologyException if processor is already added or if topics 
have already been registered by another source
      * @deprecated Since 4.0. Use {@link 
#addSource(org.apache.kafka.streams.AutoOffsetReset, String, Pattern)} instead.
      */
     @Deprecated
@@ -205,19 +187,7 @@ public class Topology {
     }
 
     /**
-     * Add a new source that consumes from topics matching the given pattern
-     * and forward the records to child processor and/or sink nodes.
-     * The source will use the {@link 
StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key deserializer} and
-     * {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value 
deserializer} specified in the
-     * {@link StreamsConfig stream configuration}.
-     * The default {@link TimestampExtractor} as specified in the {@link 
StreamsConfig config} is used.
-     *
-     * @param offsetReset the auto offset reset policy value for this source 
if no committed offsets found
-     * @param name the unique name of the source used to reference this node 
when
-     * {@link #addProcessor(String, ProcessorSupplier, String...) adding 
processor children}.
-     * @param topicPattern regular expression pattern to match Kafka topics 
that this source is to consume
-     * @return itself
-     * @throws TopologyException if processor is already added or if topics 
have already been registered by another source
+     * See {@link #addSource(String, Pattern)}.
      */
     public synchronized Topology addSource(final 
org.apache.kafka.streams.AutoOffsetReset offsetReset,
                                            final String name,
@@ -234,18 +204,7 @@ public class Topology {
     }
 
     /**
-     * Add a new source that consumes the named topics and forward the records 
to child processor and/or sink nodes.
-     * The source will use the {@link 
StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key deserializer} and
-     * {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value 
deserializer} specified in the
-     * {@link StreamsConfig stream configuration}.
-     *
-     * @param timestampExtractor the stateless timestamp extractor used for 
this source,
-     *                           if not specified the default extractor 
defined in the configs will be used
-     * @param name               the unique name of the source used to 
reference this node when
-     *                           {@link #addProcessor(String, 
ProcessorSupplier, String...) adding processor children}.
-     * @param topics             the name of one or more Kafka topics that 
this source is to consume
-     * @return itself
-     * @throws TopologyException if processor is already added or if topics 
have already been registered by another source
+     * See {@link #addSource(String, String...)}.
      */
     public synchronized Topology addSource(final TimestampExtractor 
timestampExtractor,
                                            final String name,
@@ -255,19 +214,7 @@ public class Topology {
     }
 
     /**
-     * Add a new source that consumes from topics matching the given pattern
-     * and forward the records to child processor and/or sink nodes.
-     * The source will use the {@link 
StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key deserializer} and
-     * {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value 
deserializer} specified in the
-     * {@link StreamsConfig stream configuration}.
-     *
-     * @param timestampExtractor the stateless timestamp extractor used for 
this source,
-     *                           if not specified the default extractor 
defined in the configs will be used
-     * @param name               the unique name of the source used to 
reference this node when
-     *                           {@link #addProcessor(String, 
ProcessorSupplier, String...) adding processor children}.
-     * @param topicPattern       regular expression pattern to match Kafka 
topics that this source is to consume
-     * @return itself
-     * @throws TopologyException if processor is already added or if topics 
have already been registered by another source
+     * See {@link #addSource(String, Pattern)}.
      */
     public synchronized Topology addSource(final TimestampExtractor 
timestampExtractor,
                                            final String name,
@@ -277,20 +224,6 @@ public class Topology {
     }
 
     /**
-     * Add a new source that consumes the named topics and forward the records 
to child processor and/or sink nodes.
-     * The source will use the {@link 
StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key deserializer} and
-     * {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value 
deserializer} specified in the
-     * {@link StreamsConfig stream configuration}.
-     *
-     * @param offsetReset        the auto offset reset policy to use for this 
source if no committed offsets found;
-     *                           acceptable values earliest or latest
-     * @param timestampExtractor the stateless timestamp extractor used for 
this source,
-     *                           if not specified the default extractor 
defined in the configs will be used
-     * @param name               the unique name of the source used to 
reference this node when
-     *                           {@link #addProcessor(String, 
ProcessorSupplier, String...) adding processor children}.
-     * @param topics             the name of one or more Kafka topics that 
this source is to consume
-     * @return itself
-     * @throws TopologyException if processor is already added or if topics 
have already been registered by another source
      * @deprecated Since 4.0. Use {@link 
#addSource(org.apache.kafka.streams.AutoOffsetReset, TimestampExtractor, 
String, String...)} instead.
      */
     @Deprecated
@@ -303,16 +236,7 @@ public class Topology {
     }
 
     /**
-     * Adds a new source that consumes the specified topics with a specified 
{@link TimestampExtractor}
-     * and forwards the records to child processor and/or sink nodes.
-     * The source will use the provided timestamp extractor to determine the 
timestamp of each record.
-     *
-     * @param offsetReset the auto offset reset policy to use if no committed 
offsets are found
-     * @param timestampExtractor the timestamp extractor to use for this source
-     * @param name the unique name of the source used to reference this node 
when {@link #addProcessor(String, ProcessorSupplier, String...) adding 
processor children}
-     * @param topics the name of one or more Kafka topics that this source is 
to consume
-     * @return itself
-     * @throws TopologyException if a processor is already added or if topics 
have already been registered by another source
+     * See {@link #addSource(String, String...)}.
      */
     public synchronized Topology addSource(final 
org.apache.kafka.streams.AutoOffsetReset offsetReset,
                                            final TimestampExtractor 
timestampExtractor,
@@ -330,21 +254,6 @@ public class Topology {
     }
 
     /**
-     * Add a new source that consumes from topics matching the given pattern 
and forward the records to child processor
-     * and/or sink nodes.
-     * The source will use the {@link 
StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key deserializer} and
-     * {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value 
deserializer} specified in the
-     * {@link StreamsConfig stream configuration}.
-     *
-     * @param offsetReset        the auto offset reset policy value for this 
source if no committed offsets found;
-     *                           acceptable values earliest or latest.
-     * @param timestampExtractor the stateless timestamp extractor used for 
this source,
-     *                           if not specified the default extractor 
defined in the configs will be used
-     * @param name               the unique name of the source used to 
reference this node when
-     *                           {@link #addProcessor(String, 
ProcessorSupplier, String...) adding processor children}.
-     * @param topicPattern       regular expression pattern to match Kafka 
topics that this source is to consume
-     * @return itself
-     * @throws TopologyException if processor is already added or if topics 
have already been registered by another source
      * @deprecated Since 4.0. Use {@link 
#addSource(org.apache.kafka.streams.AutoOffsetReset, TimestampExtractor, 
String, Pattern)} instead.
      */
     @Deprecated
@@ -357,16 +266,7 @@ public class Topology {
     }
 
     /**
-     * Adds a new source that consumes from topics matching the given pattern 
with a specified {@link TimestampExtractor}
-     * and forwards the records to child processor and/or sink nodes.
-     * The source will use the provided timestamp extractor to determine the 
timestamp of each record.
-     *
-     * @param offsetReset the auto offset reset policy to use if no committed 
offsets are found
-     * @param timestampExtractor the timestamp extractor to use for this source
-     * @param name the unique name of the source used to reference this node 
when {@link #addProcessor(String, ProcessorSupplier, String...) adding 
processor children}
-     * @param topicPattern the regular expression pattern to match Kafka 
topics that this source is to consume
-     * @return itself
-     * @throws TopologyException if a processor is already added or if topics 
have already been registered by another source
+     * See {@link #addSource(String, Pattern)}.
      */
     public synchronized Topology addSource(final 
org.apache.kafka.streams.AutoOffsetReset offsetReset,
                                            final TimestampExtractor 
timestampExtractor,
@@ -384,107 +284,48 @@ public class Topology {
     }
 
     /**
-     * Add a new source that consumes the named topics and forwards the 
records to child processor and/or sink nodes.
-     * The source will use the specified key and value deserializers.
-     * The default {@link TimestampExtractor} as specified in the {@link 
StreamsConfig config} is used.
-     *
-     * @param name               the unique name of the source used to 
reference this node when
-     *                           {@link #addProcessor(String, 
ProcessorSupplier, String...) adding processor children}
-     * @param keyDeserializer    key deserializer used to read this source, if 
not specified the default
-     *                           key deserializer defined in the configs will 
be used
-     * @param valueDeserializer  value deserializer used to read this source,
-     *                           if not specified the default value 
deserializer defined in the configs will be used
-     * @param topics             the name of one or more Kafka topics that 
this source is to consume
-     * @return itself
-     * @throws TopologyException if processor is already added or if topics 
have already been registered by another source
+     * See {@link #addSource(String, String...)}.
      */
-    public synchronized Topology addSource(final String name,
-                                           final Deserializer<?> 
keyDeserializer,
-                                           final Deserializer<?> 
valueDeserializer,
-                                           final String... topics) {
+    public synchronized <K, V> Topology addSource(final String name,
+                                                  final Deserializer<K> 
keyDeserializer,
+                                                  final Deserializer<V> 
valueDeserializer,
+                                                  final String... topics) {
         internalTopologyBuilder.addSource(null, name, null, keyDeserializer, 
valueDeserializer, topics);
         return this;
     }
 
     /**
-     * Add a new source that consumes from topics matching the given pattern 
and forwards the records to child processor
-     * and/or sink nodes.
-     * The source will use the specified key and value deserializers.
-     * The provided de-/serializers will be used for all matched topics, so 
care should be taken to specify patterns for
-     * topics that share the same key-value data format.
-     * The default {@link TimestampExtractor} as specified in the {@link 
StreamsConfig config} is used.
-     *
-     * @param name               the unique name of the source used to 
reference this node when
-     *                           {@link #addProcessor(String, 
ProcessorSupplier, String...) adding processor children}
-     * @param keyDeserializer    key deserializer used to read this source, if 
not specified the default
-     *                           key deserializer defined in the configs will 
be used
-     * @param valueDeserializer  value deserializer used to read this source,
-     *                           if not specified the default value 
deserializer defined in the configs will be used
-     * @param topicPattern       regular expression pattern to match Kafka 
topics that this source is to consume
-     * @return itself
-     * @throws TopologyException if processor is already added or if topics 
have already been registered by name
+     * See {@link #addSource(String, Pattern)}.
      */
-    public synchronized Topology addSource(final String name,
-                                           final Deserializer<?> 
keyDeserializer,
-                                           final Deserializer<?> 
valueDeserializer,
-                                           final Pattern topicPattern) {
+    public synchronized <K, V> Topology addSource(final String name,
+                                                  final Deserializer<K> 
keyDeserializer,
+                                                  final Deserializer<V> 
valueDeserializer,
+                                                  final Pattern topicPattern) {
         internalTopologyBuilder.addSource(null, name, null, keyDeserializer, 
valueDeserializer, topicPattern);
         return this;
     }
 
     /**
-     * Add a new source that consumes from topics matching the given pattern 
and forwards the records to child processor
-     * and/or sink nodes.
-     * The source will use the specified key and value deserializers.
-     * The provided de-/serializers will be used for all the specified topics, 
so care should be taken when specifying
-     * topics that share the same key-value data format.
-     *
-     * @param offsetReset        the auto offset reset policy to use for this 
stream if no committed offsets found;
-     *                           acceptable values are earliest or latest
-     * @param name               the unique name of the source used to 
reference this node when
-     *                           {@link #addProcessor(String, 
ProcessorSupplier, String...) adding processor children}
-     * @param keyDeserializer    key deserializer used to read this source, if 
not specified the default
-     *                           key deserializer defined in the configs will 
be used
-     * @param valueDeserializer  value deserializer used to read this source,
-     *                           if not specified the default value 
deserializer defined in the configs will be used
-     * @param topics             the name of one or more Kafka topics that 
this source is to consume
-     * @return itself
-     * @throws TopologyException if processor is already added or if topics 
have already been registered by name
      * @deprecated Since 4.0. Use {@link 
#addSource(org.apache.kafka.streams.AutoOffsetReset, String, Deserializer, 
Deserializer, String...)} instead.
      */
     @Deprecated
-    public synchronized Topology addSource(final AutoOffsetReset offsetReset,
-                                           final String name,
-                                           final Deserializer<?> 
keyDeserializer,
-                                           final Deserializer<?> 
valueDeserializer,
-                                           final String... topics) {
+    public synchronized <K, V> Topology addSource(final AutoOffsetReset 
offsetReset,
+                                                  final String name,
+                                                  final Deserializer<K> 
keyDeserializer,
+                                                  final Deserializer<V> 
valueDeserializer,
+                                                  final String... topics) {
         internalTopologyBuilder.addSource(convertOldToNew(offsetReset), name, 
null, keyDeserializer, valueDeserializer, topics);
         return this;
     }
 
     /**
-     * Add a new source that consumes from topics matching the given pattern 
and forwards the records to child processor
-     * and/or sink nodes.
-     * The source will use the specified key and value deserializers.
-     * The provided de-/serializers will be used for all the specified topics, 
so care should be taken when specifying
-     * topics that share the same key-value data format.
-     *
-     * @param offsetReset        the auto offset reset policy to use for this 
stream if no committed offsets found
-     * @param name               the unique name of the source used to 
reference this node when
-     *                           {@link #addProcessor(String, 
ProcessorSupplier, String...) adding processor children}
-     * @param keyDeserializer    key deserializer used to read this source, if 
not specified the default
-     *                           key deserializer defined in the configs will 
be used
-     * @param valueDeserializer  value deserializer used to read this source,
-     *                           if not specified the default value 
deserializer defined in the configs will be used
-     * @param topics             the name of one or more Kafka topics that 
this source is to consume
-     * @return itself
-     * @throws TopologyException if processor is already added or if topics 
have already been registered by name
+     * See {@link #addSource(String, String...)}.
      */
-    public synchronized Topology addSource(final 
org.apache.kafka.streams.AutoOffsetReset offsetReset,
-                                           final String name,
-                                           final Deserializer<?> 
keyDeserializer,
-                                           final Deserializer<?> 
valueDeserializer,
-                                           final String... topics) {
+    public synchronized <K, V> Topology addSource(final 
org.apache.kafka.streams.AutoOffsetReset offsetReset,
+                                                  final String name,
+                                                  final Deserializer<K> 
keyDeserializer,
+                                                  final Deserializer<V> 
valueDeserializer,
+                                                  final String... topics) {
         internalTopologyBuilder.addSource(
             offsetReset == null ? null : new 
AutoOffsetResetInternal(offsetReset),
             name,
@@ -497,58 +338,26 @@ public class Topology {
     }
 
     /**
-     * Add a new source that consumes from topics matching the given pattern 
and forwards the records to child processor
-     * and/or sink nodes.
-     * The source will use the specified key and value deserializers.
-     * The provided de-/serializers will be used for all matched topics, so 
care should be taken to specify patterns for
-     * topics that share the same key-value data format.
-     *
-     * @param offsetReset        the auto offset reset policy to use for this 
stream if no committed offsets found;
-     *                           acceptable values are earliest or latest
-     * @param name               the unique name of the source used to 
reference this node when
-     *                           {@link #addProcessor(String, 
ProcessorSupplier, String...) adding processor children}
-     * @param keyDeserializer    key deserializer used to read this source, if 
not specified the default
-     *                           key deserializer defined in the configs will 
be used
-     * @param valueDeserializer  value deserializer used to read this source,
-     *                           if not specified the default value 
deserializer defined in the configs will be used
-     * @param topicPattern       regular expression pattern to match Kafka 
topics that this source is to consume
-     * @return itself
-     * @throws TopologyException if processor is already added or if topics 
have already been registered by name
      * @deprecated Since 4.0. Use {@link 
#addSource(org.apache.kafka.streams.AutoOffsetReset, String, Deserializer, 
Deserializer, Pattern)} instead.
      */
     @Deprecated
-    public synchronized Topology addSource(final AutoOffsetReset offsetReset,
-                                           final String name,
-                                           final Deserializer<?> 
keyDeserializer,
-                                           final Deserializer<?> 
valueDeserializer,
-                                           final Pattern topicPattern) {
+    public synchronized <K, V> Topology addSource(final AutoOffsetReset 
offsetReset,
+                                                  final String name,
+                                                  final Deserializer<K> 
keyDeserializer,
+                                                  final Deserializer<V> 
valueDeserializer,
+                                                  final Pattern topicPattern) {
         internalTopologyBuilder.addSource(convertOldToNew(offsetReset), name, 
null, keyDeserializer, valueDeserializer, topicPattern);
         return this;
     }
 
     /**
-     * Add a new source that consumes from topics matching the given pattern 
and forwards the records to child processor
-     * and/or sink nodes.
-     * The source will use the specified key and value deserializers.
-     * The provided de-/serializers will be used for all matched topics, so 
care should be taken to specify patterns for
-     * topics that share the same key-value data format.
-     *
-     * @param offsetReset        the auto offset reset policy to use for this 
stream if no committed offsets found
-     * @param name               the unique name of the source used to 
reference this node when
-     *                           {@link #addProcessor(String, 
ProcessorSupplier, String...) adding processor children}
-     * @param keyDeserializer    key deserializer used to read this source, if 
not specified the default
-     *                           key deserializer defined in the configs will 
be used
-     * @param valueDeserializer  value deserializer used to read this source,
-     *                           if not specified the default value 
deserializer defined in the configs will be used
-     * @param topicPattern       regular expression pattern to match Kafka 
topics that this source is to consume
-     * @return itself
-     * @throws TopologyException if processor is already added or if topics 
have already been registered by name
+     * See {@link #addSource(String, Pattern)}.
      */
-    public synchronized Topology addSource(final 
org.apache.kafka.streams.AutoOffsetReset offsetReset,
-                                           final String name,
-                                           final Deserializer<?> 
keyDeserializer,
-                                           final Deserializer<?> 
valueDeserializer,
-                                           final Pattern topicPattern) {
+    public synchronized <K, V> Topology addSource(final 
org.apache.kafka.streams.AutoOffsetReset offsetReset,
+                                                  final String name,
+                                                  final Deserializer<K> 
keyDeserializer,
+                                                  final Deserializer<V> 
valueDeserializer,
+                                                  final Pattern topicPattern) {
         internalTopologyBuilder.addSource(
             offsetReset == null ? null : new 
AutoOffsetResetInternal(offsetReset),
             name,
@@ -561,58 +370,28 @@ public class Topology {
     }
 
     /**
-     * Add a new source that consumes the named topics and forwards the 
records to child processor and/or sink nodes.
-     * The source will use the specified key and value deserializers.
-     *
-     * @param offsetReset        the auto offset reset policy to use for this 
stream if no committed offsets found;
-     *                           acceptable values are earliest or latest.
-     * @param name               the unique name of the source used to 
reference this node when
-     *                           {@link #addProcessor(String, 
ProcessorSupplier, String...) adding processor children}.
-     * @param timestampExtractor the stateless timestamp extractor used for 
this source,
-     *                           if not specified the default extractor 
defined in the configs will be used
-     * @param keyDeserializer    key deserializer used to read this source, if 
not specified the default
-     *                           key deserializer defined in the configs will 
be used
-     * @param valueDeserializer  value deserializer used to read this source,
-     *                           if not specified the default value 
deserializer defined in the configs will be used
-     * @param topics             the name of one or more Kafka topics that 
this source is to consume
-     * @return itself
-     * @throws TopologyException if processor is already added or if topics 
have already been registered by another source
      * @deprecated Since 4.0. Use {@link 
#addSource(org.apache.kafka.streams.AutoOffsetReset, String, 
TimestampExtractor, Deserializer, Deserializer, String...)} instead.
      */
     @Deprecated
-    public synchronized Topology addSource(final AutoOffsetReset offsetReset,
-                                           final String name,
-                                           final TimestampExtractor 
timestampExtractor,
-                                           final Deserializer<?> 
keyDeserializer,
-                                           final Deserializer<?> 
valueDeserializer,
-                                           final String... topics) {
+    public synchronized <K, V> Topology addSource(final AutoOffsetReset 
offsetReset,
+                                                  final String name,
+                                                  final TimestampExtractor 
timestampExtractor,
+                                                  final Deserializer<K> 
keyDeserializer,
+                                                  final Deserializer<V> 
valueDeserializer,
+                                                  final String... topics) {
         internalTopologyBuilder.addSource(convertOldToNew(offsetReset), name, 
timestampExtractor, keyDeserializer, valueDeserializer, topics);
         return this;
     }
 
     /**
-     * Add a new source that consumes the named topics and forwards the 
records to child processor and/or sink nodes.
-     * The source will use the specified key and value deserializers.
-     *
-     * @param offsetReset        the auto offset reset policy to use for this 
stream if no committed offsets found
-     * @param name               the unique name of the source used to 
reference this node when
-     *                           {@link #addProcessor(String, 
ProcessorSupplier, String...) adding processor children}.
-     * @param timestampExtractor the stateless timestamp extractor used for 
this source,
-     *                           if not specified the default extractor 
defined in the configs will be used
-     * @param keyDeserializer    key deserializer used to read this source, if 
not specified the default
-     *                           key deserializer defined in the configs will 
be used
-     * @param valueDeserializer  value deserializer used to read this source,
-     *                           if not specified the default value 
deserializer defined in the configs will be used
-     * @param topics             the name of one or more Kafka topics that 
this source is to consume
-     * @return itself
-     * @throws TopologyException if processor is already added or if topics 
have already been registered by another source
+     * See {@link #addSource(String, String...)}.
      */
-    public synchronized Topology addSource(final 
org.apache.kafka.streams.AutoOffsetReset offsetReset,
-                                           final String name,
-                                           final TimestampExtractor 
timestampExtractor,
-                                           final Deserializer<?> 
keyDeserializer,
-                                           final Deserializer<?> 
valueDeserializer,
-                                           final String... topics) {
+    public synchronized <K, V> Topology addSource(final 
org.apache.kafka.streams.AutoOffsetReset offsetReset,
+                                                  final String name,
+                                                  final TimestampExtractor 
timestampExtractor,
+                                                  final Deserializer<K> 
keyDeserializer,
+                                                  final Deserializer<V> 
valueDeserializer,
+                                                  final String... topics) {
         internalTopologyBuilder.addSource(
             offsetReset == null ? null : new 
AutoOffsetResetInternal(offsetReset),
             name,
@@ -625,64 +404,28 @@ public class Topology {
     }
 
     /**
-     * Add a new source that consumes from topics matching the given pattern 
and forwards the records to child processor
-     * and/or sink nodes.
-     * The source will use the specified key and value deserializers.
-     * The provided de-/serializers will be used for all matched topics, so 
care should be taken to specify patterns for
-     * topics that share the same key-value data format.
-     *
-     * @param offsetReset        the auto offset reset policy to use for this 
stream if no committed offsets found;
-     *                           acceptable values are earliest or latest
-     * @param name               the unique name of the source used to 
reference this node when
-     *                           {@link #addProcessor(String, 
ProcessorSupplier, String...) adding processor children}.
-     * @param timestampExtractor the stateless timestamp extractor used for 
this source,
-     *                           if not specified the default extractor 
defined in the configs will be used
-     * @param keyDeserializer    key deserializer used to read this source, if 
not specified the default
-     *                           key deserializer defined in the configs will 
be used
-     * @param valueDeserializer  value deserializer used to read this source,
-     *                           if not specified the default value 
deserializer defined in the configs will be used
-     * @param topicPattern       regular expression pattern to match Kafka 
topics that this source is to consume
-     * @return itself
-     * @throws TopologyException if processor is already added or if topics 
have already been registered by name
      * @deprecated Since 4.0. Use {@link 
#addSource(org.apache.kafka.streams.AutoOffsetReset, String, 
TimestampExtractor, Deserializer, Deserializer, Pattern)} instead.
      */
     @Deprecated
-    public synchronized Topology addSource(final AutoOffsetReset offsetReset,
-                                           final String name,
-                                           final TimestampExtractor 
timestampExtractor,
-                                           final Deserializer<?> 
keyDeserializer,
-                                           final Deserializer<?> 
valueDeserializer,
-                                           final Pattern topicPattern) {
+    public synchronized <K, V> Topology addSource(final AutoOffsetReset 
offsetReset,
+                                                  final String name,
+                                                  final TimestampExtractor 
timestampExtractor,
+                                                  final Deserializer<K> 
keyDeserializer,
+                                                  final Deserializer<V> 
valueDeserializer,
+                                                  final Pattern topicPattern) {
         internalTopologyBuilder.addSource(convertOldToNew(offsetReset), name, 
timestampExtractor, keyDeserializer, valueDeserializer, topicPattern);
         return this;
     }
 
     /**
-     * Add a new source that consumes from topics matching the given pattern 
and forwards the records to child processor
-     * and/or sink nodes.
-     * The source will use the specified key and value deserializers.
-     * The provided de-/serializers will be used for all matched topics, so 
care should be taken to specify patterns for
-     * topics that share the same key-value data format.
-     *
-     * @param offsetReset        the auto offset reset policy to use for this 
stream if no committed offsets found
-     * @param name               the unique name of the source used to 
reference this node when
-     *                           {@link #addProcessor(String, 
ProcessorSupplier, String...) adding processor children}.
-     * @param timestampExtractor the stateless timestamp extractor used for 
this source,
-     *                           if not specified the default extractor 
defined in the configs will be used
-     * @param keyDeserializer    key deserializer used to read this source, if 
not specified the default
-     *                           key deserializer defined in the configs will 
be used
-     * @param valueDeserializer  value deserializer used to read this source,
-     *                           if not specified the default value 
deserializer defined in the configs will be used
-     * @param topicPattern       regular expression pattern to match Kafka 
topics that this source is to consume
-     * @return itself
-     * @throws TopologyException if processor is already added or if topics 
have already been registered by name
+     * See {@link #addSource(String, Pattern)}.
      */
-    public synchronized Topology addSource(final 
org.apache.kafka.streams.AutoOffsetReset offsetReset,
-                                           final String name,
-                                           final TimestampExtractor 
timestampExtractor,
-                                           final Deserializer<?> 
keyDeserializer,
-                                           final Deserializer<?> 
valueDeserializer,
-                                           final Pattern topicPattern) {
+    public synchronized <K, V> Topology addSource(final 
org.apache.kafka.streams.AutoOffsetReset offsetReset,
+                                                  final String name,
+                                                  final TimestampExtractor 
timestampExtractor,
+                                                  final Deserializer<K> 
keyDeserializer,
+                                                  final Deserializer<V> 
valueDeserializer,
+                                                  final Pattern topicPattern) {
         internalTopologyBuilder.addSource(
             offsetReset == null ? null : new 
AutoOffsetResetInternal(offsetReset),
             name,
@@ -695,20 +438,40 @@ public class Topology {
     }
 
     /**
-     * Add a new sink that forwards records from upstream parent processor 
and/or source nodes to the named Kafka topic.
-     * The sink will use the {@link 
StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key serializer} and
-     * {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value 
serializer} specified in the
-     * {@link StreamsConfig stream configuration}.
+     * Add a sink that sends records from upstream
+     * {@link #addProcessor(String, ProcessorSupplier, String...) processors} 
or
+     * {@link #addSource(String, String...) sources} to the named Kafka topic.
+     * The specified topic should be created before the {@link KafkaStreams} 
instance is started.
+     *
+     * <p>The sink will use the default values from {@link StreamsConfig} for
+     * <ul>
+     *   <li>{@link StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG key 
serializer}</li>
+     *   <li>{@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG value 
serializer}</li>
+     * </ul>
+     *
+     * Furthermore, the producer's configured partitioner is used to write 
into the topic.
+     * If you want to specify a sink specific key or value {@link Serializer}, 
or use a different
+     * {@link StreamPartitioner partitioner}, use the corresponding overloaded 
{@code addSink(...)} method.
+     *
+     * @param name
+     *        the unique name of the sink
+     * @param topic
+     *        the name of the Kafka topic to which this sink should write its 
records
+     * @param parentNames
+     *        the name of one or more {@link #addProcessor(String, 
ProcessorSupplier, String...) processors} or
+     *        {@link #addSource(String, String...) sources}, whose output 
records this sink should consume and write
+     *        to the specified output topic
      *
-     * @param name the unique name of the sink
-     * @param topic the name of the Kafka topic to which this sink should 
write its records
-     * @param parentNames the name of one or more source or processor nodes 
whose output records this sink should consume
-     * and write to its topic
      * @return itself
-     * @throws TopologyException if parent processor is not added yet, or if 
this processor's name is equal to the parent's name
-     * @see #addSink(String, String, StreamPartitioner, String...)
-     * @see #addSink(String, String, Serializer, Serializer, String...)
-     * @see #addSink(String, String, Serializer, Serializer, 
StreamPartitioner, String...)
+     *
+     * @throws TopologyException
+     *         if the provided sink name is not unique, or
+     *         if a parent processor/source name is unknown or specifies a sink
+     * @throws NullPointerException
+     *         if {@code name}, {@code topic}, or {@code parentNames} is 
{@code null}, or
+     *         {@code parentNames} contains a {@code null} parent name
+     *
+     * @see #addSink(String, TopicNameExtractor, String...)
      */
     public synchronized Topology addSink(final String name,
                                          final String topic,
@@ -718,29 +481,7 @@ public class Topology {
     }
 
     /**
-     * Add a new sink that forwards records from upstream parent processor 
and/or source nodes to the named Kafka topic,
-     * using the supplied partitioner.
-     * The sink will use the {@link 
StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key serializer} and
-     * {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value 
serializer} specified in the
-     * {@link StreamsConfig stream configuration}.
-     * <p>
-     * The sink will also use the specified {@link StreamPartitioner} to 
determine how records are distributed among
-     * the named Kafka topic's partitions.
-     * Such control is often useful with topologies that use {@link 
#addStateStore(StoreBuilder, String...) state
-     * stores} in its processors.
-     * In most other cases, however, a partitioner needs not be specified and 
Kafka will automatically distribute
-     * records among partitions using Kafka's default partitioning logic.
-     *
-     * @param name the unique name of the sink
-     * @param topic the name of the Kafka topic to which this sink should 
write its records
-     * @param partitioner the function that should be used to determine the 
partition for each record processed by the sink
-     * @param parentNames the name of one or more source or processor nodes 
whose output records this sink should consume
-     * and write to its topic
-     * @return itself
-     * @throws TopologyException if parent processor is not added yet, or if 
this processor's name is equal to the parent's name
-     * @see #addSink(String, String, String...)
-     * @see #addSink(String, String, Serializer, Serializer, String...)
-     * @see #addSink(String, String, Serializer, Serializer, 
StreamPartitioner, String...)
+     * See {@link #addSink(String, String, String...)}.
      */
     public synchronized <K, V> Topology addSink(final String name,
                                                 final String topic,
@@ -751,24 +492,7 @@ public class Topology {
     }
 
     /**
-     * Add a new sink that forwards records from upstream parent processor 
and/or source nodes to the named Kafka topic.
-     * The sink will use the specified key and value serializers.
-     *
-     * @param name the unique name of the sink
-     * @param topic the name of the Kafka topic to which this sink should 
write its records
-     * @param keySerializer the {@link Serializer key serializer} used when 
consuming records; may be null if the sink
-     * should use the {@link StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG 
default key serializer} specified in the
-     * {@link StreamsConfig stream configuration}
-     * @param valueSerializer the {@link Serializer value serializer} used 
when consuming records; may be null if the sink
-     * should use the {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG 
default value serializer} specified in the
-     * {@link StreamsConfig stream configuration}
-     * @param parentNames the name of one or more source or processor nodes 
whose output records this sink should consume
-     * and write to its topic
-     * @return itself
-     * @throws TopologyException if parent processor is not added yet, or if 
this processor's name is equal to the parent's name
-     * @see #addSink(String, String, String...)
-     * @see #addSink(String, String, StreamPartitioner, String...)
-     * @see #addSink(String, String, Serializer, Serializer, 
StreamPartitioner, String...)
+     * See {@link #addSink(String, String, String...)}.
      */
     public synchronized <K, V> Topology addSink(final String name,
                                                 final String topic,
@@ -780,25 +504,7 @@ public class Topology {
     }
 
     /**
-     * Add a new sink that forwards records from upstream parent processor 
and/or source nodes to the named Kafka topic.
-     * The sink will use the specified key and value serializers, and the 
supplied partitioner.
-     *
-     * @param name the unique name of the sink
-     * @param topic the name of the Kafka topic to which this sink should 
write its records
-     * @param keySerializer the {@link Serializer key serializer} used when 
consuming records; may be null if the sink
-     * should use the {@link StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG 
default key serializer} specified in the
-     * {@link StreamsConfig stream configuration}
-     * @param valueSerializer the {@link Serializer value serializer} used 
when consuming records; may be null if the sink
-     * should use the {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG 
default value serializer} specified in the
-     * {@link StreamsConfig stream configuration}
-     * @param partitioner the function that should be used to determine the 
partition for each record processed by the sink
-     * @param parentNames the name of one or more source or processor nodes 
whose output records this sink should consume
-     * and write to its topic
-     * @return itself
-     * @throws TopologyException if parent processor is not added yet, or if 
this processor's name is equal to the parent's name
-     * @see #addSink(String, String, String...)
-     * @see #addSink(String, String, StreamPartitioner, String...)
-     * @see #addSink(String, String, Serializer, Serializer, String...)
+     * See {@link #addSink(String, String, String...)}.
      */
     public synchronized <K, V> Topology addSink(final String name,
                                                 final String topic,
@@ -811,57 +517,27 @@ public class Topology {
     }
 
     /**
-     * Add a new sink that forwards records from upstream parent processor 
and/or source nodes to Kafka topics based on {@code topicExtractor}.
-     * The topics that it may ever send to should be pre-created.
-     * The sink will use the {@link 
StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key serializer} and
-     * {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value 
serializer} specified in the
-     * {@link StreamsConfig stream configuration}.
+     * See {@link #addSink(String, String, String...)}.
      *
-     * @param name              the unique name of the sink
-     * @param topicExtractor    the extractor to determine the name of the 
Kafka topic to which this sink should write for each record
-     * @param parentNames       the name of one or more source or processor 
nodes whose output records this sink should consume
-     *                          and dynamically write to topics
-     * @return                  itself
-     * @throws TopologyException if parent processor is not added yet, or if 
this processor's name is equal to the parent's name
-     * @see #addSink(String, String, StreamPartitioner, String...)
-     * @see #addSink(String, String, Serializer, Serializer, String...)
-     * @see #addSink(String, String, Serializer, Serializer, 
StreamPartitioner, String...)
+     * <p>Takes a {@link TopicNameExtractor} (cannot be {@code null}) that 
computes topic names to send records into,
+     * instead of a single topic name.
+     * The topic name extractor is called for every result record and may 
compute a different topic name each time.
+     * All topics, that the topic name extractor may compute, should be 
created before the {@link KafkaStreams}
+     * instance is started.
+     * Returning {@code null} as topic name is invalid and will result in a 
runtime exception.
      */
     public synchronized <K, V> Topology addSink(final String name,
-                                                final TopicNameExtractor<K, V> 
topicExtractor,
+                                                final TopicNameExtractor<? 
super K, ? super V> topicExtractor,
                                                 final String... parentNames) {
         internalTopologyBuilder.addSink(name, topicExtractor, null, null, 
null, parentNames);
         return this;
     }
 
     /**
-     * Add a new sink that forwards records from upstream parent processor 
and/or source nodes to Kafka topics based on {@code topicExtractor},
-     * using the supplied partitioner.
-     * The topics that it may ever send to should be pre-created.
-     * The sink will use the {@link 
StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key serializer} and
-     * {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value 
serializer} specified in the
-     * {@link StreamsConfig stream configuration}.
-     * <p>
-     * The sink will also use the specified {@link StreamPartitioner} to 
determine how records are distributed among
-     * the named Kafka topic's partitions.
-     * Such control is often useful with topologies that use {@link 
#addStateStore(StoreBuilder, String...) state
-     * stores} in its processors.
-     * In most other cases, however, a partitioner needs not be specified and 
Kafka will automatically distribute
-     * records among partitions using Kafka's default partitioning logic.
-     *
-     * @param name              the unique name of the sink
-     * @param topicExtractor    the extractor to determine the name of the 
Kafka topic to which this sink should write for each record
-     * @param partitioner       the function that should be used to determine 
the partition for each record processed by the sink
-     * @param parentNames       the name of one or more source or processor 
nodes whose output records this sink should consume
-     *                          and dynamically write to topics
-     * @return                  itself
-     * @throws TopologyException if parent processor is not added yet, or if 
this processor's name is equal to the parent's name
-     * @see #addSink(String, String, String...)
-     * @see #addSink(String, String, Serializer, Serializer, String...)
-     * @see #addSink(String, String, Serializer, Serializer, 
StreamPartitioner, String...)
+     * See {@link #addSink(String, String, String...)}.
      */
     public synchronized <K, V> Topology addSink(final String name,
-                                                final TopicNameExtractor<K, V> 
topicExtractor,
+                                                final TopicNameExtractor<? 
super K, ? super V> topicExtractor,
                                                 final StreamPartitioner<? 
super K, ? super V> partitioner,
                                                 final String... parentNames) {
         internalTopologyBuilder.addSink(name, topicExtractor, null, null, 
partitioner, parentNames);
@@ -869,28 +545,10 @@ public class Topology {
     }
 
     /**
-     * Add a new sink that forwards records from upstream parent processor 
and/or source nodes to Kafka topics based on {@code topicExtractor}.
-     * The topics that it may ever send to should be pre-created.
-     * The sink will use the specified key and value serializers.
-     *
-     * @param name              the unique name of the sink
-     * @param topicExtractor    the extractor to determine the name of the 
Kafka topic to which this sink should write for each record
-     * @param keySerializer     the {@link Serializer key serializer} used 
when consuming records; may be null if the sink
-     *                          should use the {@link 
StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key serializer} specified 
in the
-     *                          {@link StreamsConfig stream configuration}
-     * @param valueSerializer   the {@link Serializer value serializer} used 
when consuming records; may be null if the sink
-     *                          should use the {@link 
StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value serializer} 
specified in the
-     *                          {@link StreamsConfig stream configuration}
-     * @param parentNames       the name of one or more source or processor 
nodes whose output records this sink should consume
-     *                          and dynamically write to topics
-     * @return                  itself
-     * @throws TopologyException if parent processor is not added yet, or if 
this processor's name is equal to the parent's name
-     * @see #addSink(String, String, String...)
-     * @see #addSink(String, String, StreamPartitioner, String...)
-     * @see #addSink(String, String, Serializer, Serializer, 
StreamPartitioner, String...)
+     * See {@link #addSink(String, String, String...)}.
      */
     public synchronized <K, V> Topology addSink(final String name,
-                                                final TopicNameExtractor<K, V> 
topicExtractor,
+                                                final TopicNameExtractor<? 
super K, ? super V> topicExtractor,
                                                 final Serializer<K> 
keySerializer,
                                                 final Serializer<V> 
valueSerializer,
                                                 final String... parentNames) {
@@ -899,29 +557,10 @@ public class Topology {
     }
 
     /**
-     * Add a new sink that forwards records from upstream parent processor 
and/or source nodes to Kafka topics based on {@code topicExtractor}.
-     * The topics that it may ever send to should be pre-created.
-     * The sink will use the specified key and value serializers, and the 
supplied partitioner.
-     *
-     * @param name              the unique name of the sink
-     * @param topicExtractor    the extractor to determine the name of the 
Kafka topic to which this sink should write for each record
-     * @param keySerializer     the {@link Serializer key serializer} used 
when consuming records; may be null if the sink
-     *                          should use the {@link 
StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key serializer} specified 
in the
-     *                          {@link StreamsConfig stream configuration}
-     * @param valueSerializer   the {@link Serializer value serializer} used 
when consuming records; may be null if the sink
-     *                          should use the {@link 
StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value serializer} 
specified in the
-     *                          {@link StreamsConfig stream configuration}
-     * @param partitioner       the function that should be used to determine 
the partition for each record processed by the sink
-     * @param parentNames       the name of one or more source or processor 
nodes whose output records this sink should consume
-     *                          and dynamically write to topics
-     * @return                  itself
-     * @throws TopologyException if parent processor is not added yet, or if 
this processor's name is equal to the parent's name
-     * @see #addSink(String, String, String...)
-     * @see #addSink(String, String, StreamPartitioner, String...)
-     * @see #addSink(String, String, Serializer, Serializer, String...)
+     * See {@link #addSink(String, String, String...)}.
      */
     public synchronized <K, V> Topology addSink(final String name,
-                                                final TopicNameExtractor<K, V> 
topicExtractor,
+                                                final TopicNameExtractor<? 
super K, ? super V> topicExtractor,
                                                 final Serializer<K> 
keySerializer,
                                                 final Serializer<V> 
valueSerializer,
                                                 final StreamPartitioner<? 
super K, ? super V> partitioner,
@@ -931,18 +570,39 @@ public class Topology {
     }
 
     /**
-     * Add a new processor node that receives and processes records output by 
one or more parent source or processor
-     * node.
-     * Any new record output by this processor will be forwarded to its child 
processor or sink nodes.
-     * If {@code supplier} provides stores via {@link 
ConnectedStoreProvider#stores()}, the provided {@link StoreBuilder}s
-     * will be added to the topology and connected to this processor 
automatically.
+     * Add a {@link Processor processor} that receives and processed records 
from one or more parent processors or
+     * {@link #addSource(String, String...) sources}.
+     * Any record output by this processor will be forwarded to its child 
processors and
+     * {@link #addSink(String, String, String...) sinks}.
+     *
+     * <p>By default, the processor is stateless.
+     * There is three different {@link StateStore state stores}, which can be 
connected to a processor:
+     * <ul>
+     *   <li>{@link #addStateStore(StoreBuilder, String...) state stores} for 
processing (i.e., read/write access)</li>
+     *   <li>{@link #addReadOnlyStateStore(StoreBuilder, String, 
TimestampExtractor, Deserializer, Deserializer, String, String, 
ProcessorSupplier) read-only state stores}</li>
+     *   <li>{@link #addGlobalStore(StoreBuilder, String, Deserializer, 
Deserializer, String, String, ProcessorSupplier) global state stores} 
(read-only)</li>
+     * </ul>
+     *
+     * If the {@code supplier} provides state stores via {@link 
ConnectedStoreProvider#stores()}, the corresponding
+     * {@link StoreBuilder StoreBuilders} will be {@link 
#addStateStore(StoreBuilder, String...) added to the topology
+     * and connected} to this processor automatically.
+     *
+     * @param name
+     *        the unique name of the processor used to reference this node 
when adding other processor or
+     *        {@link #addSink(String, String, String...) sink} children
+     * @param supplier
+     *        the supplier used to obtain {@link Processor} instances
+     * @param parentNames
+     *        the name of one or more processors or {@link #addSource(String, 
String...) sources},
+     *        whose output records this processor should receive and process
      *
-     * @param name the unique name of the processor node
-     * @param supplier the supplier used to obtain this node's {@link 
Processor} instance
-     * @param parentNames the name of one or more source or processor nodes 
whose output records this processor should receive
-     * and process
      * @return itself
-     * @throws TopologyException if parent processor is not added yet, or if 
this processor's name is equal to the parent's name
+     *
+     * @throws TopologyException
+     *         if the provided processor name is not unique, or
+     *         if a parent processor/source name is unknown or specifies a sink
+     *
+     * @see org.apache.kafka.streams.processor.api.ContextualProcessor 
ContextualProcessor
      */
     public synchronized <KIn, VIn, KOut, VOut> Topology addProcessor(final 
String name,
                                                                      final 
ProcessorSupplier<KIn, VIn, KOut, VOut> supplier,
@@ -960,52 +620,151 @@ public class Topology {
     }
 
     /**
-     * Adds a state store.
+     * Add a {@link StateStore state store} to the topology, and optionally 
connect it to one or more
+     * {@link #addProcessor(String, ProcessorSupplier, String...) processors}.
+     * State stores are sharded and the number of shards is determined at 
runtime by the number of input topic
+     * partitions and the structure of the topology.
+     * Each connected {@link Processor} instance in the topology has access to 
a single shard of the state store.
+     * Additionally, the state store can be accessed from "outside" using 
"Interactive Queries" (cf.,
+     * {@link KafkaStreams#store(StoreQueryParameters)} and {@link 
KafkaStreams#query(StateQueryRequest)}).
+     * If you need access to all data in a state store inside a {@link 
Processor}, you can use a (read-only)
+     * {@link #addGlobalStore(StoreBuilder, String, Deserializer, 
Deserializer, String, String, ProcessorSupplier)
+     * global state store}.
+     *
+     * <p>If no {@code processorNames} is specified, the state store can be
+     * {@link #connectProcessorAndStateStores(String, String...) connected} to 
one or more
+     * {@link #addProcessor(String, ProcessorSupplier, String...) processors} 
later.
+     *
+     * <p>Note, if a state store is never connected to any
+     * {@link #addProcessor(String, ProcessorSupplier, String...) processor}, 
the state store is "dangling" and would
+     * not be added to the created {@code ProcessorTopology}, when {@link 
KafkaStreams} is started.
+     * For this case, the state store is not available for "Interactive 
Queries".
+     * If you want to add a state store only for "Interactive Queries", you 
can use a
+     * {@link #addReadOnlyStateStore(StoreBuilder, String, Deserializer, 
Deserializer, String, String, ProcessorSupplier) read-only state store}.
+     *
+     * <p>For failure and recovery, a state store {@link 
StoreBuilder#loggingEnabled() may be backed} by an internal
+     * changelog topic that will be created in Kafka.
+     * The changelog topic will be named 
"${applicationId}-&lt;storename&gt;-changelog", where "applicationId" is
+     * user-specified in {@link StreamsConfig} via parameter
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, 
"storeName" is provided by the
+     * {@link StoreBuilder#name() store builder}, and "-changelog" is a fixed 
suffix.
+     *
+     * <p>You can verify the created {@code ProcessorTopology} and added state 
stores, and retrieve all generated
+     * internal topic names, via {@link Topology#describe()}.
+     *
+     * @param storeBuilder
+     *        the {@link StoreBuilder} used to obtain {@link StateStore state 
store} instances (one per shard)
+     * @param processorNames
+     *        the names of the {@link #addProcessor(String, ProcessorSupplier, 
String...) processors} that should be
+     *        able to access the provided state store
      *
-     * @param storeBuilder the storeBuilder used to obtain this state store 
{@link StateStore} instance
-     * @param processorNames the names of the processors that should be able 
to access the provided store
      * @return itself
-     * @throws TopologyException if state store supplier is already added
+     *
+     * @throws TopologyException
+     *         if the {@link StoreBuilder#name() state store} was already 
added, or
+     *         if a processor name is unknown or specifies a source or sink
      */
-    public synchronized Topology addStateStore(final StoreBuilder<?> 
storeBuilder,
-                                               final String... processorNames) 
{
+    public synchronized <S extends StateStore> Topology addStateStore(final 
StoreBuilder<S> storeBuilder,
+                                                                      final 
String... processorNames) {
         internalTopologyBuilder.addStateStore(storeBuilder, processorNames);
         return this;
     }
 
     /**
-     * Adds a read-only {@link StateStore} to the topology.
-     * <p>
-     * A read-only {@link StateStore} does not create a dedicated changelog 
topic but uses it's input topic as
-     * changelog; thus, the used topic should be configured with log 
compaction.
-     * <p>
-     * The <code>auto.offset.reset</code> property will be set to 
<code>earliest</code> for this topic.
-     * <p>
-     * The provided {@link ProcessorSupplier} will be used to create a 
processor for all messages received
-     * from the given topic. This processor should contain logic to keep the 
{@link StateStore} up-to-date.
-     *
-     * @param storeBuilder          user defined store builder
-     * @param sourceName            name of the {@link SourceNode} that will 
be automatically added
-     * @param timestampExtractor    the stateless timestamp extractor used for 
this source,
-     *                              if not specified the default extractor 
defined in the configs will be used
-     * @param keyDeserializer       the {@link Deserializer} to deserialize 
keys with
-     * @param valueDeserializer     the {@link Deserializer} to deserialize 
values with
-     * @param topic                 the topic to source the data from
-     * @param processorName         the name of the {@link ProcessorSupplier}
-     * @param stateUpdateSupplier   the instance of {@link ProcessorSupplier}
-     * @return itself
-     * @throws TopologyException if the processor of state is already 
registered
-     */
-    public synchronized <KIn, VIn> Topology addReadOnlyStateStore(final 
StoreBuilder<?> storeBuilder,
-                                                                  final String 
sourceName,
-                                                                  final 
TimestampExtractor timestampExtractor,
-                                                                  final 
Deserializer<KIn> keyDeserializer,
-                                                                  final 
Deserializer<VIn> valueDeserializer,
-                                                                  final String 
topic,
-                                                                  final String 
processorName,
-                                                                  final 
ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier) {
-        storeBuilder.withLoggingDisabled();
+     * Adds a read-only {@link StateStore state store} to the topology.
+     * The state store will be populated with data from the named source topic.
+     * State stores are sharded and the number of shards is determined at 
runtime by the number of input topic
+     * partitions for the source topic <em>and</em> the connected processors 
(if any).
+     * Read-only state stores can be accessed from "outside" using 
"Interactive Queries" (cf.,
+     * {@link KafkaStreams#store(StoreQueryParameters)} and {@link 
KafkaStreams#query(StateQueryRequest)}).
+     *
+     * <p>The {@code auto.offset.reset} property will be set to {@code 
"earliest"} for the source topic.
+     * If you want to specify a source specific {@link TimestampExtractor} you 
can use
+     * {@link #addReadOnlyStateStore(StoreBuilder, String, TimestampExtractor, 
Deserializer, Deserializer, String, String, ProcessorSupplier)}.
+     *
+     * <p>{@link #connectProcessorAndStateStores(String, String...) 
Connecting} a read-only state store to
+     * {@link #addProcessor(String, ProcessorSupplier, String...) processors} 
is optional.
+     * If not connected to any processor, the state store will still be 
created and can be queried via
+     * {@link KafkaStreams#store(StoreQueryParameters)} or {@link 
KafkaStreams#query(StateQueryRequest)}.
+     * If the state store is connected to another processor, each 
corresponding {@link Processor} instance in the
+     * topology has <em>read-only</em> access to a single shard of the state 
store.
+     * If you need write access to a state store, you can use a
+     * {@link #addStateStore(StoreBuilder, String...) "regular" state store} 
instead.
+     * If you need access to all data in a state store inside a {@link 
Processor}, you can use a (read-only)
+     * {@link #addGlobalStore(StoreBuilder, String, Deserializer, 
Deserializer, String, String, ProcessorSupplier)
+     * global state store}.
+     *
+     * <p>The provided {@link ProcessorSupplier} will be used to create {@link 
Processor} instances which will be used
+     * to process the records from the source topic.
+     * These {@link Processor processors} are the only ones with 
<em>write</em> access to the state store,
+     * and should contain logic to keep the {@link StateStore} up-to-date.
+     *
+     * <p>Read-only state stores are always enabled for fault-tolerance and 
recovery.
+     * In contrast to {@link #addStateStore(StoreBuilder, String...) "regular" 
state stores} no dedicated changelog
+     * topic will be created in Kafka though, but the source topic is used for 
recovery.
+     * Thus, the source topic should be configured with log compaction.
+     *
+     * @param storeBuilder
+     *        the {@link StoreBuilder} used to obtain {@link StateStore state 
store} instances (one per shard)
+     * @param sourceName
+     *        the unique name of the internally added {@link 
#addSource(String, String...) source}
+     * @param keyDeserializer
+     *        the {@link Deserializer} for record keys
+     *        (can be {@code null} to use the default key deserializer from 
{@link StreamsConfig})
+     * @param valueDeserializer
+     *        the {@link Deserializer} for record values
+     *        (can be {@code null} to use the default value deserializer from 
{@link StreamsConfig})
+     * @param topic
+     *        the source topic to read the data from
+     * @param processorName
+     *        the unique name of the internally added
+     *        {@link #addProcessor(String, ProcessorSupplier, String...) 
processor} which maintains the state store
+     * @param stateUpdateSupplier
+     *        the supplier used to obtain {@link Processor} instances, which 
maintain the state store
+     *
+     * @return itself
+     *
+     * @throws TopologyException
+     *         if the {@link StoreBuilder#name() state store} was already 
added, or
+     *         if the source or processor names are not unique, or
+     *         if the source topic has already been registered by another
+     *         {@link #addSink(String, String, String...) source}, read-only 
state store, or
+     *         {@link #addGlobalStore(StoreBuilder, String, Deserializer, 
Deserializer, String, String, ProcessorSupplier) global state store}
+     */
+    public synchronized <K, V, S extends StateStore> Topology 
addReadOnlyStateStore(
+        final StoreBuilder<S> storeBuilder,
+        final String sourceName,
+        final Deserializer<K> keyDeserializer,
+        final Deserializer<V> valueDeserializer,
+        final String topic,
+        final String processorName,
+        final ProcessorSupplier<K, V, Void, Void> stateUpdateSupplier
+    ) {
+        return addReadOnlyStateStore(
+            storeBuilder,
+            sourceName,
+            null,
+            keyDeserializer,
+            valueDeserializer,
+            topic,
+            processorName,
+            stateUpdateSupplier
+        );
+    }
 
+    /**
+     * See {@link #addReadOnlyStateStore(StoreBuilder, String, Deserializer, 
Deserializer, String, String, ProcessorSupplier)}.
+     */
+    public synchronized <K, V, S extends StateStore> Topology 
addReadOnlyStateStore(
+        final StoreBuilder<S> storeBuilder,
+        final String sourceName,
+        final TimestampExtractor timestampExtractor,
+        final Deserializer<K> keyDeserializer,
+        final Deserializer<V> valueDeserializer,
+        final String topic,
+        final String processorName,
+        final ProcessorSupplier<K, V, Void, Void> stateUpdateSupplier
+    ) {
         internalTopologyBuilder.addSource(
             new 
AutoOffsetResetInternal(org.apache.kafka.streams.AutoOffsetReset.earliest()),
             sourceName,
@@ -1016,82 +775,86 @@ public class Topology {
         );
         internalTopologyBuilder.addProcessor(processorName, 
stateUpdateSupplier, sourceName);
         internalTopologyBuilder.addStateStore(storeBuilder, processorName);
+
+        // connect the source topic as (read-only) changelog topic for 
fault-tolerance
+        storeBuilder.withLoggingDisabled();
         
internalTopologyBuilder.connectSourceStoreAndTopic(storeBuilder.name(), topic);
 
         return this;
     }
 
-    /**
-     * Adds a read-only {@link StateStore} to the topology.
-     * <p>
-     * A read-only {@link StateStore} does not create a dedicated changelog 
topic but uses it's input topic as
-     * changelog; thus, the used topic should be configured with log 
compaction.
-     * <p>
-     * The <code>auto.offset.reset</code> property will be set to 
<code>earliest</code> for this topic.
-     * <p>
-     * The provided {@link ProcessorSupplier} will be used to create a 
processor for all messages received
-     * from the given topic. This processor should contain logic to keep the 
{@link StateStore} up-to-date.
-     * The default {@link TimestampExtractor} as specified in the {@link 
StreamsConfig config} is used.
-     *
-     * @param storeBuilder          user defined store builder
-     * @param sourceName            name of the {@link SourceNode} that will 
be automatically added
-     * @param keyDeserializer       the {@link Deserializer} to deserialize 
keys with
-     * @param valueDeserializer     the {@link Deserializer} to deserialize 
values with
-     * @param topic                 the topic to source the data from
-     * @param processorName         the name of the {@link ProcessorSupplier}
-     * @param stateUpdateSupplier   the instance of {@link ProcessorSupplier}
-     * @return itself
-     * @throws TopologyException if the processor of state is already 
registered
-     */
-    public synchronized <KIn, VIn> Topology addReadOnlyStateStore(final 
StoreBuilder<?> storeBuilder,
-                                                                  final String 
sourceName,
-                                                                  final 
Deserializer<KIn> keyDeserializer,
-                                                                  final 
Deserializer<VIn> valueDeserializer,
-                                                                  final String 
topic,
-                                                                  final String 
processorName,
-                                                                  final 
ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier) {
-        return addReadOnlyStateStore(
-                storeBuilder,
-                sourceName,
-                null,
-                keyDeserializer,
-                valueDeserializer,
-                topic,
-                processorName,
-                stateUpdateSupplier
-        );
-    }
 
     /**
-     * Adds a global {@link StateStore} to the topology.
-     * The {@link StateStore} sources its data from all partitions of the 
provided input topic.
-     * There will be exactly one instance of this {@link StateStore} per Kafka 
Streams instance.
-     * <p>
-     * A {@link SourceNode} with the provided sourceName will be added to 
consume the data arriving from the partitions
-     * of the input topic.
-     * <p>
-     * The provided {@link ProcessorSupplier} will be used to create an {@link 
ProcessorNode} that will receive all
-     * records forwarded from the {@link SourceNode}.
-     * This {@link ProcessorNode} should be used to keep the {@link 
StateStore} up-to-date.
-     * The default {@link TimestampExtractor} as specified in the {@link 
StreamsConfig config} is used.
-     *
-     * @param storeBuilder          user defined state store builder
-     * @param sourceName            name of the {@link SourceNode} that will 
be automatically added
-     * @param keyDeserializer       the {@link Deserializer} to deserialize 
keys with
-     * @param valueDeserializer     the {@link Deserializer} to deserialize 
values with
-     * @param topic                 the topic to source the data from
-     * @param processorName         the name of the {@link ProcessorSupplier}
-     * @param stateUpdateSupplier   the instance of {@link ProcessorSupplier}
-     * @return itself
-     * @throws TopologyException if the processor of state is already 
registered
-     */
-    public synchronized <KIn, VIn> Topology addGlobalStore(final 
StoreBuilder<?> storeBuilder,
-                                                           final String 
sourceName,
-                                                           final 
Deserializer<KIn> keyDeserializer,
-                                                           final 
Deserializer<VIn> valueDeserializer,
-                                                           final String topic,
-                                                           final String 
processorName,
-                                                           final 
ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier) {
+     * Adds a global {@link StateStore state store} to the topology.
+     * The state store will be populated with data from the named source topic.
+     * Global state stores are read-only, and contain data from all partitions 
of the specified source topic.
+     * Thus, each {@link KafkaStreams} instance has a full copy to the data; 
the source topic records are effectively
+     * broadcast to all instances.
+     * In contrast to
+     * {@link #addReadOnlyStateStore(StoreBuilder, String, Deserializer, 
Deserializer, String, String, ProcessorSupplier) read-only state stores}
+     * global state stores are "bootstrapped" on startup, and are maintained 
by a separate thread.
+     * Thus, updates to a global store are not "stream-time synchronized" what 
may lead to non-deterministic results.
+     * Like all other stores, global state stores can be accessed from 
"outside" using "Interactive Queries" (cf.,
+     * {@link KafkaStreams#store(StoreQueryParameters)} and {@link 
KafkaStreams#query(StateQueryRequest)}).
+     *
+     * <p>The {@code auto.offset.reset} property will be set to {@code 
"earliest"} for the source topic.
+     * If you want to specify a source specific {@link TimestampExtractor} you 
can use
+     * {@link #addGlobalStore(StoreBuilder, String, TimestampExtractor, 
Deserializer, Deserializer, String, String, ProcessorSupplier)}.
+     *
+     * <p>All {@link #addProcessor(String, ProcessorSupplier, String...) 
processors} of the topology automatically
+     * have read-only access to the global store; it is not necessary to 
connect them.
+     * If you need write access to a state store, you can use a
+     * {@link #addStateStore(StoreBuilder, String...) "regular" state store} 
instead.
+     *
+     * <p>The provided {@link ProcessorSupplier} will be used to create {@link 
Processor} instances which will be used
+     * to process the records from the source topic.
+     * These {@link Processor processors} are the only ones with 
<em>write</em> access to the state store,
+     * and should contain logic to keep the {@link StateStore} up-to-date.
+     *
+     * <p>Global state stores are always enabled for fault-tolerance and 
recovery.
+     * In contrast to {@link #addStateStore(StoreBuilder, String...) "regular" 
state stores} no dedicated changelog
+     * topic will be created in Kafka though, but the source topic is used for 
recovery.
+     * Thus, the source topic should be configured with log compaction.
+     *
+     * @param storeBuilder
+     *        the {@link StoreBuilder} used to obtain the {@link StateStore 
state store} (one per {@link KafkaStreams} instance)
+     * @param sourceName
+     *        the unique name of the internally added source
+     * @param keyDeserializer
+     *        the {@link Deserializer} for record keys
+     *        (can be {@code null} to use the default key deserializer from 
{@link StreamsConfig})
+     * @param valueDeserializer
+     *        the {@link Deserializer} for record values
+     *        (can be {@code null} to use the default value deserializer from 
{@link StreamsConfig})
+     * @param topic
+     *        the source topic to read the data from
+     * @param processorName
+     *        the unique name of the internally added processor which 
maintains the state store
+     * @param stateUpdateSupplier
+     *        the supplier used to obtain {@link Processor} instances, which 
maintain the state store
+     *
+     * @return itself
+     *
+     * @throws TopologyException
+     *         if the {@link StoreBuilder#name() state store} was already 
added, or
+     *         if the source or processor names are not unique, or
+     *         if the source topic has already been registered by another
+     *         {@link #addSink(String, String, String...) source},
+     *         {@link #addReadOnlyStateStore(StoreBuilder, String, 
Deserializer, Deserializer, String, String, ProcessorSupplier) read-only state 
store}, or
+     *         global state store
+     */
+    public synchronized <K, V, S extends StateStore> Topology addGlobalStore(
+        final StoreBuilder<S> storeBuilder,
+        final String sourceName,
+        final Deserializer<K> keyDeserializer,
+        final Deserializer<V> valueDeserializer,
+        final String topic,
+        final String processorName,
+        final ProcessorSupplier<K, V, Void, Void> stateUpdateSupplier
+    ) {
+        Objects.requireNonNull(storeBuilder, "storeBuilder cannot be null");
+        Objects.requireNonNull(stateUpdateSupplier, "stateUpdateSupplier 
cannot be null");
+
         internalTopologyBuilder.addGlobalStore(
             sourceName,
             null,
@@ -1106,37 +869,18 @@ public class Topology {
     }
 
     /**
-     * Adds a global {@link StateStore} to the topology.
-     * The {@link StateStore} sources its data from all partitions of the 
provided input topic.
-     * There will be exactly one instance of this {@link StateStore} per Kafka 
Streams instance.
-     * <p>
-     * A {@link SourceNode} with the provided sourceName will be added to 
consume the data arriving from the partitions
-     * of the input topic.
-     * <p>
-     * The provided {@link ProcessorSupplier} will be used to create an {@link 
ProcessorNode} that will receive all
-     * records forwarded from the {@link SourceNode}.
-     * This {@link ProcessorNode} should be used to keep the {@link 
StateStore} up-to-date.
-     *
-     * @param storeBuilder          user defined key value store builder
-     * @param sourceName            name of the {@link SourceNode} that will 
be automatically added
-     * @param timestampExtractor    the stateless timestamp extractor used for 
this source,
-     *                              if not specified the default extractor 
defined in the configs will be used
-     * @param keyDeserializer       the {@link Deserializer} to deserialize 
keys with
-     * @param valueDeserializer     the {@link Deserializer} to deserialize 
values with
-     * @param topic                 the topic to source the data from
-     * @param processorName         the name of the {@link ProcessorSupplier}
-     * @param stateUpdateSupplier   the instance of {@link ProcessorSupplier}
-     * @return itself
-     * @throws TopologyException if the processor of state is already 
registered
+     * See {@link #addGlobalStore(StoreBuilder, String, Deserializer, 
Deserializer, String, String, ProcessorSupplier)}.
      */
-    public synchronized <KIn, VIn> Topology addGlobalStore(final 
StoreBuilder<?> storeBuilder,
-                                                           final String 
sourceName,
-                                                           final 
TimestampExtractor timestampExtractor,
-                                                           final 
Deserializer<KIn> keyDeserializer,
-                                                           final 
Deserializer<VIn> valueDeserializer,
-                                                           final String topic,
-                                                           final String 
processorName,
-                                                           final 
ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier) {
+    public synchronized <K, V, S extends StateStore> Topology addGlobalStore(
+        final StoreBuilder<S> storeBuilder,
+        final String sourceName,
+        final TimestampExtractor timestampExtractor,
+        final Deserializer<K> keyDeserializer,
+        final Deserializer<V> valueDeserializer,
+        final String topic,
+        final String processorName,
+        final ProcessorSupplier<K, V, Void, Void> stateUpdateSupplier
+    ) {
         internalTopologyBuilder.addGlobalStore(
             sourceName,
             timestampExtractor,
@@ -1151,12 +895,22 @@ public class Topology {
     }
 
     /**
-     * Connects the processor and the state stores.
+     * Connect a {@link #addProcessor(String, ProcessorSupplier, String...) 
processor} to one or more
+     * {@link StateStore state stores}.
+     * The state stores must have been previously added to the topology via
+     * {@link #addStateStore(StoreBuilder, String...)}, or
+     * {@link #addReadOnlyStateStore(StoreBuilder, String, Deserializer, 
Deserializer, String, String, ProcessorSupplier)}.
+     *
+     * @param processorName
+     *        the name of the processor
+     * @param stateStoreNames
+     *        the names of state stores that the processor should be able to 
access
      *
-     * @param processorName the name of the processor
-     * @param stateStoreNames the names of state stores that the processor uses
      * @return itself
-     * @throws TopologyException if the processor or a state store is unknown
+     *
+     * @throws TopologyException
+     *         if the processor name or a state store name is unknown, or
+     *         if the processor name specifies a source or sink
      */
     public synchronized Topology connectProcessorAndStateStores(final String 
processorName,
                                                                 final 
String... stateStoreNames) {
@@ -1167,9 +921,8 @@ public class Topology {
     /**
      * Returns a description of the specified {@code Topology}.
      *
-     * @return a description of the topology.
+     * @return A description of the topology.
      */
-
     public synchronized TopologyDescription describe() {
         return internalTopologyBuilder.describe();
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/internals/ApiUtils.java 
b/streams/src/main/java/org/apache/kafka/streams/internals/ApiUtils.java
index cce48cd0925..bd0fc41265e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/internals/ApiUtils.java
+++ b/streams/src/main/java/org/apache/kafka/streams/internals/ApiUtils.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.internals;
 
 import java.time.Duration;
 import java.time.Instant;
+import java.util.Objects;
 import java.util.function.Supplier;
 
 import static java.lang.String.format;
@@ -80,9 +81,11 @@ public final class ApiUtils {
     /**
      * @throws IllegalArgumentException if the same instance is obtained each 
time
      */
-    public static void checkSupplier(final Supplier<?> supplier) {
-        if (supplier.get() == supplier.get()) {
-            final String supplierClass = supplier.getClass().getName();
+    public static void checkSupplier(final Supplier<?> processorSupplier) {
+        Objects.requireNonNull(processorSupplier, "processorSupplier cannot be 
null");
+
+        if (processorSupplier.get() == processorSupplier.get()) {
+            final String supplierClass = 
processorSupplier.getClass().getName();
             throw new IllegalArgumentException(String.format("%s generates 
single reference." +
                     " %s#get() must return a new object each time it is 
called.", supplierClass, supplierClass));
         }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 0baa8fbb72e..ee2c2b5a14d 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -1237,10 +1237,9 @@ public class KStreamImpl<K, V> extends AbstractStream<K, 
V> implements KStream<K
         final Named named,
         final String... stateStoreNames
     ) {
-        Objects.requireNonNull(processorSupplier, "processorSupplier can't be 
null");
+        ApiUtils.checkSupplier(processorSupplier);
         Objects.requireNonNull(named, "named can't be null");
         Objects.requireNonNull(stateStoreNames, "stateStoreNames can't be a 
null array");
-        ApiUtils.checkSupplier(processorSupplier);
         for (final String stateStoreName : stateStoreNames) {
             Objects.requireNonNull(stateStoreName, "stateStoreNames can't be 
null");
         }
@@ -1282,10 +1281,9 @@ public class KStreamImpl<K, V> extends AbstractStream<K, 
V> implements KStream<K
         final Named named,
         final String... stateStoreNames
     ) {
-        Objects.requireNonNull(processorSupplier, "processorSupplier can't be 
null");
+        ApiUtils.checkSupplier(processorSupplier);
         Objects.requireNonNull(named, "named can't be null");
         Objects.requireNonNull(stateStoreNames, "stateStoreNames can't be a 
null array");
-        ApiUtils.checkSupplier(processorSupplier);
         for (final String stateStoreName : stateStoreNames) {
             Objects.requireNonNull(stateStoreName, "stateStoreNames can't be 
null");
         }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java
index 5e776a5c733..192f44dbe34 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java
@@ -103,13 +103,16 @@ public class TableSourceNode<K, V> extends 
SourceGraphNode<K, V> {
                                       consumedInternal().valueDeserializer(),
                                       topicName);
 
-            processorParameters.addProcessorTo(topologyBuilder, new String[] 
{sourceName});
+            processorParameters.addProcessorTo(topologyBuilder, sourceName);
 
             // if the KTableSource should not be materialized, stores will be 
null or empty
             final KTableSource<K, V> tableSource = (KTableSource<K, V>) 
processorParameters.processorSupplier();
             if (tableSource.stores() != null) {
                 if (shouldReuseSourceTopicForChangelog) {
+                    // TODO: rewrite this part to use 
Topology.addReadOnlyStateStore() instead
+                    // should allow to move off using 
`InternalTopologyBuilder` in favor of the public `Topology` API
                     tableSource.stores().forEach(store -> {
+                        // connect the source topic as (read-only) changelog 
topic for fault-tolerance
                         store.withLoggingDisabled();
                         
topologyBuilder.connectSourceStoreAndTopic(store.name(), topicName);
                     });
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index 2ff87ef19fa..a2376b9d984 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -74,6 +74,8 @@ public class InternalTopologyBuilder {
     }
 
     public InternalTopologyBuilder(final TopologyConfig topologyConfigs) {
+        Objects.requireNonNull(topologyConfigs, "topologyConfigs cannot be 
null");
+
         this.topologyConfigs = topologyConfigs;
         this.topologyName = topologyConfigs.topologyName;
 
@@ -350,11 +352,11 @@ public class InternalTopologyBuilder {
         private final Serializer<KIn> keySerializer;
         private final Serializer<VIn> valSerializer;
         private final StreamPartitioner<? super KIn, ? super VIn> partitioner;
-        private final TopicNameExtractor<KIn, VIn> topicExtractor;
+        private final TopicNameExtractor<? super KIn, ? super VIn> 
topicExtractor;
 
         private SinkNodeFactory(final String name,
                                 final String[] predecessors,
-                                final TopicNameExtractor<KIn, VIn> 
topicExtractor,
+                                final TopicNameExtractor<? super KIn, ? super 
VIn> topicExtractor,
                                 final Serializer<KIn> keySerializer,
                                 final Serializer<VIn> valSerializer,
                                 final StreamPartitioner<? super KIn, ? super 
VIn> partitioner) {
@@ -368,7 +370,7 @@ public class InternalTopologyBuilder {
         @Override
         public ProcessorNode<KIn, VIn, Void, Void> build() {
             if (topicExtractor instanceof StaticTopicNameExtractor) {
-                final String topic = ((StaticTopicNameExtractor<KIn, VIn>) 
topicExtractor).topicName;
+                final String topic = ((StaticTopicNameExtractor<?, ?>) 
topicExtractor).topicName;
                 if (internalTopicNamesWithProperties.containsKey(topic)) {
                     // prefix the internal topic name with the application id
                     return new SinkNode<>(name, new 
StaticTopicNameExtractor<>(decorateTopic(topic)), keySerializer, valSerializer, 
partitioner);
@@ -447,18 +449,23 @@ public class InternalTopologyBuilder {
         return this;
     }
 
+    private void verifyName(final String name) {
+        Objects.requireNonNull(name, "name cannot be null");
+        if (nodeFactories.containsKey(name)) {
+            throw new TopologyException("Processor " + name + " is already 
added.");
+        }
+    }
+
     public final void addSource(final AutoOffsetResetInternal offsetReset,
                                 final String name,
                                 final TimestampExtractor timestampExtractor,
                                 final Deserializer<?> keyDeserializer,
                                 final Deserializer<?> valDeserializer,
                                 final String... topics) {
+        verifyName(name);
+        Objects.requireNonNull(topics, "topics cannot be null");
         if (topics.length == 0) {
-            throw new TopologyException("You must provide at least one topic");
-        }
-        Objects.requireNonNull(name, "name must not be null");
-        if (nodeFactories.containsKey(name)) {
-            throw new TopologyException("Processor " + name + " is already 
added.");
+            throw new TopologyException("topics cannot be empty");
         }
 
         for (final String topic : topics) {
@@ -480,12 +487,8 @@ public class InternalTopologyBuilder {
                                 final Deserializer<?> keyDeserializer,
                                 final Deserializer<?> valDeserializer,
                                 final Pattern topicPattern) {
-        Objects.requireNonNull(topicPattern, "topicPattern can't be null");
-        Objects.requireNonNull(name, "name can't be null");
-
-        if (nodeFactories.containsKey(name)) {
-            throw new TopologyException("Processor " + name + " is already 
added.");
-        }
+        verifyName(name);
+        Objects.requireNonNull(topicPattern, "topicPattern cannot be null");
 
         for (final String sourceTopicName : rawSourceTopicNames) {
             if (topicPattern.matcher(sourceTopicName).matches()) {
@@ -507,46 +510,23 @@ public class InternalTopologyBuilder {
                                      final Serializer<V> valSerializer,
                                      final StreamPartitioner<? super K, ? 
super V> partitioner,
                                      final String... predecessorNames) {
-        Objects.requireNonNull(name, "name must not be null");
-        Objects.requireNonNull(topic, "topic must not be null");
-        Objects.requireNonNull(predecessorNames, "predecessor names must not 
be null");
-        if (predecessorNames.length == 0) {
-            throw new TopologyException("Sink " + name + " must have at least 
one parent");
-        }
+        verifyName(name);
+        Objects.requireNonNull(topic, "topic cannot be null");
+        verifyParents(name, predecessorNames);
 
         addSink(name, new StaticTopicNameExtractor<>(topic), keySerializer, 
valSerializer, partitioner, predecessorNames);
         nodeToSinkTopic.put(name, topic);
-        nodeGroups = null;
     }
 
     public final <K, V> void addSink(final String name,
-                                     final TopicNameExtractor<K, V> 
topicExtractor,
+                                     final TopicNameExtractor<? super K, ? 
super V> topicExtractor,
                                      final Serializer<K> keySerializer,
                                      final Serializer<V> valSerializer,
                                      final StreamPartitioner<? super K, ? 
super V> partitioner,
                                      final String... predecessorNames) {
-        Objects.requireNonNull(name, "name must not be null");
-        Objects.requireNonNull(topicExtractor, "topic extractor must not be 
null");
-        Objects.requireNonNull(predecessorNames, "predecessor names must not 
be null");
-        if (nodeFactories.containsKey(name)) {
-            throw new TopologyException("Processor " + name + " is already 
added.");
-        }
-        if (predecessorNames.length == 0) {
-            throw new TopologyException("Sink " + name + " must have at least 
one parent");
-        }
-
-        for (final String predecessor : predecessorNames) {
-            Objects.requireNonNull(predecessor, "predecessor name can't be 
null");
-            if (predecessor.equals(name)) {
-                throw new TopologyException("Processor " + name + " cannot be 
a predecessor of itself.");
-            }
-            if (!nodeFactories.containsKey(predecessor)) {
-                throw new TopologyException("Predecessor processor " + 
predecessor + " is not added yet.");
-            }
-            if (nodeToSinkTopic.containsKey(predecessor)) {
-                throw new TopologyException("Sink " + predecessor + " cannot 
be used a parent.");
-            }
-        }
+        verifyName(name);
+        Objects.requireNonNull(topicExtractor, "topicExtractor cannot be 
null");
+        verifyParents(name, predecessorNames);
 
         nodeFactories.put(name, new SinkNodeFactory<>(name, predecessorNames, 
topicExtractor, keySerializer, valSerializer, partitioner));
         nodeGrouper.add(name);
@@ -554,64 +534,50 @@ public class InternalTopologyBuilder {
         nodeGroups = null;
     }
 
-    public final <KIn, VIn, KOut, VOut> void addProcessor(final String name,
-                                                          final 
ProcessorSupplier<KIn, VIn, KOut, VOut> supplier,
-                                                          final String... 
predecessorNames) {
-        Objects.requireNonNull(name, "name must not be null");
-        Objects.requireNonNull(supplier, "supplier must not be null");
-        Objects.requireNonNull(predecessorNames, "predecessor names must not 
be null");
-        ApiUtils.checkSupplier(supplier);
-        if (nodeFactories.containsKey(name)) {
-            throw new TopologyException("Processor " + name + " is already 
added.");
-        }
-        if (predecessorNames.length == 0) {
-            throw new TopologyException("Processor " + name + " must have at 
least one parent");
-        }
+    public final void addProcessor(final String name,
+                                   final ProcessorSupplier<?, ?, ?, ?> 
processorSupplier,
+                                   final String... predecessorNames) {
+        verifyName(name);
+        ApiUtils.checkSupplier(processorSupplier);
+        verifyParents(name, predecessorNames);
 
-        for (final String predecessor : predecessorNames) {
-            Objects.requireNonNull(predecessor, "predecessor name must not be 
null");
-            if (predecessor.equals(name)) {
-                throw new TopologyException("Processor " + name + " cannot be 
a predecessor of itself.");
-            }
-            if (!nodeFactories.containsKey(predecessor)) {
-                throw new TopologyException("Predecessor processor " + 
predecessor + " is not added yet for " + name);
-            }
-        }
-
-        nodeFactories.put(name, new ProcessorNodeFactory<>(name, 
predecessorNames, supplier));
+        nodeFactories.put(name, new ProcessorNodeFactory<>(name, 
predecessorNames, processorSupplier));
         nodeGrouper.add(name);
         nodeGrouper.unite(name, predecessorNames);
         nodeGroups = null;
     }
 
     public final <KIn, VIn, VOut> void addProcessor(final String name,
-                                                    final 
FixedKeyProcessorSupplier<KIn, VIn, VOut> supplier,
+                                                    final 
FixedKeyProcessorSupplier<KIn, VIn, VOut> processorSupplier,
                                                     final String... 
predecessorNames) {
-        Objects.requireNonNull(name, "name must not be null");
-        Objects.requireNonNull(supplier, "supplier must not be null");
-        Objects.requireNonNull(predecessorNames, "predecessor names must not 
be null");
-        ApiUtils.checkSupplier(supplier);
-        if (nodeFactories.containsKey(name)) {
-            throw new TopologyException("Processor " + name + " is already 
added.");
-        }
+        verifyName(name);
+        ApiUtils.checkSupplier(processorSupplier);
+        verifyParents(name, predecessorNames);
+
+        nodeFactories.put(name, new FixedKeyProcessorNodeFactory<>(name, 
predecessorNames, processorSupplier));
+        nodeGrouper.add(name);
+        nodeGrouper.unite(name, predecessorNames);
+        nodeGroups = null;
+    }
+
+    private void verifyParents(final String processorName, final String... 
predecessorNames) {
+        Objects.requireNonNull(predecessorNames, "predecessorNames must not be 
null");
         if (predecessorNames.length == 0) {
-            throw new TopologyException("Processor " + name + " must have at 
least one parent");
+            throw new TopologyException("predecessorNames cannot be empty");
         }
 
         for (final String predecessor : predecessorNames) {
-            Objects.requireNonNull(predecessor, "predecessor name must not be 
null");
-            if (predecessor.equals(name)) {
-                throw new TopologyException("Processor " + name + " cannot be 
a predecessor of itself.");
-            }
+            Objects.requireNonNull(predecessor, "predecessor name cannot be 
null");
             if (!nodeFactories.containsKey(predecessor)) {
-                throw new TopologyException("Predecessor processor " + 
predecessor + " is not added yet for " + name);
+                if (predecessor.equals(processorName)) {
+                    throw new TopologyException("Predecessor " + predecessor + 
" is unknown (self-reference).");
+                }
+                throw new TopologyException("Predecessor " + predecessor + " 
is unknown.");
+            }
+            if (nodeToSinkTopic.containsKey(predecessor)) {
+                throw new TopologyException("Sink " + predecessor + " cannot 
be used a parent.");
             }
         }
-
-        nodeFactories.put(name, new FixedKeyProcessorNodeFactory<>(name, 
predecessorNames, supplier));
-        nodeGrouper.add(name);
-        nodeGrouper.unite(name, predecessorNames);
-        nodeGroups = null;
     }
 
     public final void addStateStore(final StoreBuilder<?> storeBuilder,
@@ -640,10 +606,11 @@ public class InternalTopologyBuilder {
 
         if (processorNames != null) {
             for (final String processorName : processorNames) {
-                Objects.requireNonNull(processorName, "processor name must not 
be null");
+                Objects.requireNonNull(processorName, "processor cannot not be 
null");
                 connectProcessorAndStateStore(processorName, 
storeFactory.storeName());
             }
         }
+
         nodeGroups = null;
     }
 
@@ -655,22 +622,33 @@ public class InternalTopologyBuilder {
                                                 final String processorName,
                                                 final ProcessorSupplier<KIn, 
VIn, Void, Void> stateUpdateSupplier,
                                                 final boolean 
reprocessOnRestore) {
+        verifyName(sourceName);
+
+        Objects.requireNonNull(topic, "topic cannot be null");
+        validateTopicNotAlreadyRegistered(topic);
+
+        verifyName(processorName);
+        if (sourceName.equals(processorName)) {
+            throw new TopologyException("sourceName and processorName must be 
different.");
+        }
+
         ApiUtils.checkSupplier(stateUpdateSupplier);
         final Set<StoreBuilder<?>> stores = stateUpdateSupplier.stores();
         if (stores == null || stores.size() != 1) {
             throw new IllegalArgumentException(
-                    "Global stores must pass in suppliers with exactly one 
store but got " +
-                            (stores != null ? stores.size() : 0));
+                "Global stores must pass in suppliers with exactly one store 
but got " +
+                    (stores != null ? stores.size() : 0));
         }
         final StoreFactory storeFactory =
-                StoreBuilderWrapper.wrapStoreBuilder(stores.iterator().next());
-        validateGlobalStoreArguments(sourceName,
-                                     topic,
-                                     processorName,
-                                     stateUpdateSupplier,
-                                     storeFactory.storeName(),
-                                     storeFactory.loggingEnabled());
-        validateTopicNotAlreadyRegistered(topic);
+            StoreBuilderWrapper.wrapStoreBuilder(stores.iterator().next());
+
+        final String storeName = storeFactory.storeName();
+        if (stateFactories.containsKey(storeName)) {
+            throw new TopologyException("A different StateStore has already 
been added with the name " + storeName);
+        }
+        if (globalStateBuilders.containsKey(storeName)) {
+            throw new TopologyException("A different GlobalStateStore has 
already been added with the name " + storeName);
+        }
 
         final String[] topics = {topic};
         final String[] predecessors = {sourceName};
@@ -701,6 +679,8 @@ public class InternalTopologyBuilder {
         nodeGrouper.add(processorName);
         nodeGrouper.unite(processorName, predecessors);
         globalStateBuilders.put(storeFactory.storeName(), storeFactory);
+        // connect the source topic as (read-only) changelog topic for 
fault-tolerance
+        storeFactory.withLoggingDisabled();
         connectSourceStoreAndTopic(storeFactory.storeName(), topic);
         nodeGroups = null;
     }
@@ -728,13 +708,21 @@ public class InternalTopologyBuilder {
 
     public final void connectProcessorAndStateStores(final String 
processorName,
                                                      final String... 
stateStoreNames) {
-        Objects.requireNonNull(processorName, "processorName can't be null");
-        Objects.requireNonNull(stateStoreNames, "state store list must not be 
null");
+        Objects.requireNonNull(processorName, "processorName cannot be null");
+        Objects.requireNonNull(stateStoreNames, "stateStoreNames cannot  
null");
         if (stateStoreNames.length == 0) {
-            throw new TopologyException("Must provide at least one state store 
name.");
+            throw new TopologyException("stateStoreNames cannot be empty");
+        }
+
+        if (nodeToSourceTopics.containsKey(processorName)
+            || nodeToSourcePatterns.containsKey(processorName)
+            || nodeToSinkTopic.containsKey(processorName)) {
+            throw new TopologyException("State stores cannot be connect to 
sources or sinks.");
+
         }
+
         for (final String stateStoreName : stateStoreNames) {
-            Objects.requireNonNull(stateStoreName, "state store name must not 
be null");
+            Objects.requireNonNull(stateStoreName, "state store name cannot be 
null");
             connectProcessorAndStateStore(processorName, stateStoreName);
         }
         nodeGroups = null;
@@ -810,36 +798,6 @@ public class InternalTopologyBuilder {
         }
     }
 
-    private void validateGlobalStoreArguments(final String sourceName,
-                                              final String topic,
-                                              final String processorName,
-                                              final ProcessorSupplier<?, ?, 
Void, Void> stateUpdateSupplier,
-                                              final String storeName,
-                                              final boolean loggingEnabled) {
-        Objects.requireNonNull(sourceName, "sourceName must not be null");
-        Objects.requireNonNull(topic, "topic must not be null");
-        Objects.requireNonNull(stateUpdateSupplier, "supplier must not be 
null");
-        Objects.requireNonNull(processorName, "processorName must not be 
null");
-        if (nodeFactories.containsKey(sourceName)) {
-            throw new TopologyException("Processor " + sourceName + " is 
already added.");
-        }
-        if (nodeFactories.containsKey(processorName)) {
-            throw new TopologyException("Processor " + processorName + " is 
already added.");
-        }
-        if (stateFactories.containsKey(storeName)) {
-            throw new TopologyException("A different StateStore has already 
been added with the name " + storeName);
-        }
-        if (globalStateBuilders.containsKey(storeName)) {
-            throw new TopologyException("A different GlobalStateStore has 
already been added with the name " + storeName);
-        }
-        if (loggingEnabled) {
-            throw new TopologyException("StateStore " + storeName + " for 
global table must not have logging enabled.");
-        }
-        if (sourceName.equals(processorName)) {
-            throw new TopologyException("sourceName and processorName must be 
different.");
-        }
-    }
-
     private void connectProcessorAndStateStore(final String processorName,
                                                final String stateStoreName) {
         if (globalStateBuilders.containsKey(stateStoreName)) {
@@ -878,7 +836,7 @@ public class InternalTopologyBuilder {
             if (nodeFactory instanceof SourceNodeFactory) {
                 sourceNodes.add((SourceNodeFactory<?, ?>) nodeFactory);
             } else if (nodeFactory instanceof ProcessorNodeFactory) {
-                
sourceNodes.addAll(findSourcesForProcessorPredecessors(((ProcessorNodeFactory<?,
 ?, ?, ?>) nodeFactory).predecessors));
+                
sourceNodes.addAll(findSourcesForProcessorPredecessors(nodeFactory.predecessors));
             }
         }
         return sourceNodes;
@@ -1346,14 +1304,12 @@ public class InternalTopologyBuilder {
         }
     }
 
-    private <S extends StateStore> InternalTopicConfig 
createChangelogTopicConfig(final StoreFactory factory,
-                                                                               
   final String name) {
+    private InternalTopicConfig createChangelogTopicConfig(final StoreFactory 
factory,
+                                                           final String name) {
         if (factory.isVersionedStore()) {
-            final VersionedChangelogTopicConfig config = new 
VersionedChangelogTopicConfig(name, factory.logConfig(), 
factory.historyRetention());
-            return config;
+            return new VersionedChangelogTopicConfig(name, 
factory.logConfig(), factory.historyRetention());
         } else if (factory.isWindowStore()) {
-            final WindowedChangelogTopicConfig config = new 
WindowedChangelogTopicConfig(name, factory.logConfig(), 
factory.retentionPeriod());
-            return config;
+            return new WindowedChangelogTopicConfig(name, factory.logConfig(), 
factory.retentionPeriod());
         } else {
             return new UnwindowedUnversionedChangelogTopicConfig(name, 
factory.logConfig());
         }
@@ -1923,9 +1879,10 @@ public class InternalTopologyBuilder {
     }
 
     public static final class Sink<K, V> extends AbstractNode implements 
TopologyDescription.Sink {
-        private final TopicNameExtractor<K, V> topicNameExtractor;
+        private final TopicNameExtractor<? super K, ? super V> 
topicNameExtractor;
+
         public Sink(final String name,
-                    final TopicNameExtractor<K, V> topicNameExtractor) {
+                    final TopicNameExtractor<? super K, ? super V> 
topicNameExtractor) {
             super(name);
             this.topicNameExtractor = topicNameExtractor;
         }
@@ -1939,14 +1896,14 @@ public class InternalTopologyBuilder {
         @Override
         public String topic() {
             if (topicNameExtractor instanceof StaticTopicNameExtractor) {
-                return ((StaticTopicNameExtractor<K, V>) 
topicNameExtractor).topicName;
+                return ((StaticTopicNameExtractor<?, ?>) 
topicNameExtractor).topicName;
             } else {
                 return null;
             }
         }
 
         @Override
-        public TopicNameExtractor<K, V> topicNameExtractor() {
+        public TopicNameExtractor<? super K, ? super V> topicNameExtractor() {
             if (topicNameExtractor instanceof StaticTopicNameExtractor) {
                 return null;
             } else {
@@ -1968,7 +1925,6 @@ public class InternalTopologyBuilder {
                 + nodeNames(predecessors);
         }
 
-        @SuppressWarnings("unchecked")
         @Override
         public boolean equals(final Object o) {
             if (this == o) {
@@ -1978,7 +1934,7 @@ public class InternalTopologyBuilder {
                 return false;
             }
 
-            final Sink<K, V> sink = (Sink<K, V>) o;
+            final Sink<?, ?> sink = (Sink<?, ?>) o;
             return name.equals(sink.name)
                 && topicNameExtractor.equals(sink.topicNameExtractor)
                 && predecessors.equals(sink.predecessors);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
index c7dcf135eaa..f3baeb237d6 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
@@ -30,13 +30,13 @@ public class SinkNode<KIn, VIn> extends ProcessorNode<KIn, 
VIn, Void, Void> {
 
     private Serializer<KIn> keySerializer;
     private Serializer<VIn> valSerializer;
-    private final TopicNameExtractor<KIn, VIn> topicExtractor;
+    private final TopicNameExtractor<? super KIn, ? super VIn> topicExtractor;
     private final StreamPartitioner<? super KIn, ? super VIn> partitioner;
 
     private InternalProcessorContext<Void, Void> context;
 
     SinkNode(final String name,
-             final TopicNameExtractor<KIn, VIn> topicExtractor,
+             final TopicNameExtractor<? super KIn, ? super VIn> topicExtractor,
              final Serializer<KIn> keySerializer,
              final Serializer<VIn> valSerializer,
              final StreamPartitioner<? super KIn, ? super VIn> partitioner) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java 
b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
index 0cb91b12a58..81986f49032 100644
--- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
@@ -423,10 +423,8 @@ public class TopologyTest {
         }
     }
 
-    @Deprecated // testing old PAPI
     @Test
     public void 
shouldNotAllowToAddGlobalStoreWithSourceNameEqualsProcessorName() {
-        when(globalStoreBuilder.name()).thenReturn("anyName");
         assertThrows(TopologyException.class, () -> topology.addGlobalStore(
             globalStoreBuilder,
             "sameName",
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index 26b2ea197c3..a351a6a812c 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -507,7 +507,7 @@ public class KStreamImplTest {
     public void shouldNotAllowNullGroupedOnGroupBy() {
         final NullPointerException exception = assertThrows(
             NullPointerException.class,
-            () -> testStream.groupBy((k, v) -> k, (Grouped<String, String>) 
null));
+            () -> testStream.groupBy((k, v) -> k, null));
         assertThat(exception.getMessage(), equalTo("grouped can't be null"));
     }
 
@@ -515,7 +515,7 @@ public class KStreamImplTest {
     public void shouldNotAllowNullGroupedOnGroupByKey() {
         final NullPointerException exception = assertThrows(
             NullPointerException.class,
-            () -> testStream.groupByKey((Grouped<String, String>) null));
+            () -> testStream.groupByKey(null));
         assertThat(exception.getMessage(), equalTo("grouped can't be null"));
     }
 
@@ -646,7 +646,7 @@ public class KStreamImplTest {
                 testStream,
                 MockValueJoiner.TOSTRING_JOINER,
                 JoinWindows.of(ofMillis(10)),
-                (StreamJoined<String, String, String>) null));
+                null));
         assertThat(exception.getMessage(), equalTo("streamJoined can't be 
null"));
     }
 
@@ -746,7 +746,7 @@ public class KStreamImplTest {
                 testStream,
                 MockValueJoiner.TOSTRING_JOINER,
                 JoinWindows.of(ofMillis(10)),
-                (StreamJoined<String, String, String>) null));
+                null));
         assertThat(exception.getMessage(), equalTo("streamJoined can't be 
null"));
     }
 
@@ -845,7 +845,7 @@ public class KStreamImplTest {
                 testStream,
                 MockValueJoiner.TOSTRING_JOINER,
                 JoinWindows.of(ofMillis(10)),
-                (StreamJoined<String, String, String>) null));
+                null));
         assertThat(exception.getMessage(), equalTo("streamJoined can't be 
null"));
     }
 
@@ -1595,7 +1595,7 @@ public class KStreamImplTest {
         final NullPointerException exception = assertThrows(
             NullPointerException.class,
             () -> testStream.process((ProcessorSupplier<? super String, ? 
super String, Void, Void>) null));
-        assertThat(exception.getMessage(), equalTo("processorSupplier can't be 
null"));
+        assertThat(exception.getMessage(), equalTo("processorSupplier cannot 
be null"));
     }
 
     @Test
@@ -1604,7 +1604,7 @@ public class KStreamImplTest {
             NullPointerException.class,
             () -> testStream.process((ProcessorSupplier<? super String, ? 
super String, Void, Void>) null,
                                      "storeName"));
-        assertThat(exception.getMessage(), equalTo("processorSupplier can't be 
null"));
+        assertThat(exception.getMessage(), equalTo("processorSupplier cannot 
be null"));
     }
 
     @Test
@@ -1613,7 +1613,7 @@ public class KStreamImplTest {
             NullPointerException.class,
             () -> testStream.process((ProcessorSupplier<? super String, ? 
super String, Void, Void>) null,
                                      Named.as("processor")));
-        assertThat(exception.getMessage(), equalTo("processorSupplier can't be 
null"));
+        assertThat(exception.getMessage(), equalTo("processorSupplier cannot 
be null"));
     }
 
     @Test
@@ -1622,7 +1622,7 @@ public class KStreamImplTest {
             NullPointerException.class,
             () -> testStream.process((ProcessorSupplier<? super String, ? 
super String, Void, Void>) null,
                                      Named.as("processor"), "stateStore"));
-        assertThat(exception.getMessage(), equalTo("processorSupplier can't be 
null"));
+        assertThat(exception.getMessage(), equalTo("processorSupplier cannot 
be null"));
     }
 
     @Test
@@ -1678,7 +1678,7 @@ public class KStreamImplTest {
         final NullPointerException exception = assertThrows(
             NullPointerException.class,
             () -> testStream.processValues((FixedKeyProcessorSupplier<? super 
String, ? super String, Void>) null));
-        assertThat(exception.getMessage(), equalTo("processorSupplier can't be 
null"));
+        assertThat(exception.getMessage(), equalTo("processorSupplier cannot 
be null"));
     }
 
     @Test
@@ -1687,7 +1687,7 @@ public class KStreamImplTest {
             NullPointerException.class,
             () -> testStream.processValues((FixedKeyProcessorSupplier<? super 
String, ? super String, Void>) null,
                 "storeName"));
-        assertThat(exception.getMessage(), equalTo("processorSupplier can't be 
null"));
+        assertThat(exception.getMessage(), equalTo("processorSupplier cannot 
be null"));
     }
 
     @Test
@@ -1696,7 +1696,7 @@ public class KStreamImplTest {
             NullPointerException.class,
             () -> testStream.process((ProcessorSupplier<? super String, ? 
super String, Void, Void>) null,
                 Named.as("processor")));
-        assertThat(exception.getMessage(), equalTo("processorSupplier can't be 
null"));
+        assertThat(exception.getMessage(), equalTo("processorSupplier cannot 
be null"));
     }
 
     @Test
@@ -1705,7 +1705,7 @@ public class KStreamImplTest {
             NullPointerException.class,
             () -> testStream.process((ProcessorSupplier<? super String, ? 
super String, Void, Void>) null,
                 Named.as("processor"), "stateStore"));
-        assertThat(exception.getMessage(), equalTo("processorSupplier can't be 
null"));
+        assertThat(exception.getMessage(), equalTo("processorSupplier cannot 
be null"));
     }
 
     @Test
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
index 5802518cd26..366aa6636d0 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
@@ -549,7 +549,7 @@ public class InternalTopologyBuilderTest {
             new MockKeyValueStoreBuilder("testStore", 
false).withLoggingDisabled();
 
         builder.addGlobalStore(
-            "global-store",
+            "global-source",
             null,
             null,
             null,
@@ -562,11 +562,11 @@ public class InternalTopologyBuilderTest {
         final TopologyException exception = assertThrows(
             TopologyException.class,
             () -> builder.addGlobalStore(
-                "global-store-2",
+                "global-source-2",
                 null,
                 null,
                 null,
-                "global-topic",
+                "global-topic-2",
                 "global-processor-2",
                 new StoreDelegatingProcessorSupplier<>(new 
MockApiProcessorSupplier<>(), Set.of(secondGlobalBuilder)),
                 false

Reply via email to