MINOR: update JavaDocs for Kafka Streams DSL helpers

 - also deprecate ZK config for Streams

Author: Matthias J. Sax <[email protected]>

Reviewers: Ismael Juma, Guozhang Wang

Closes #2459 from mjsax/javaDocImprovements8

(cherry picked from commit 4c42654b1eecebae272dfe5ce018b85ad4db7709)
Signed-off-by: Guozhang Wang <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/903548f1
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/903548f1
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/903548f1

Branch: refs/heads/0.10.2
Commit: 903548f1a0f4365ae11315a4c36a5e80b90d1ff8
Parents: 05b1ebd
Author: Matthias J. Sax <[email protected]>
Authored: Fri Jan 27 16:48:44 2017 -0800
Committer: Guozhang Wang <[email protected]>
Committed: Fri Jan 27 16:48:52 2017 -0800

----------------------------------------------------------------------
 .../kafka/streams/KafkaClientSupplier.java      |  42 +-
 .../org/apache/kafka/streams/KafkaStreams.java  | 393 ++++++-----
 .../java/org/apache/kafka/streams/KeyValue.java |  30 +-
 .../org/apache/kafka/streams/StreamsConfig.java | 648 ++++++++++---------
 .../apache/kafka/streams/StreamsMetrics.java    | 116 ++--
 .../kafka/streams/kstream/KGroupedStream.java   |   6 +-
 .../kafka/streams/kstream/KStreamBuilder.java   | 483 +++++++++-----
 .../processor/WallclockTimestampExtractor.java  |   1 +
 .../processor/internals/StreamsKafkaClient.java |  12 +-
 .../GlobalKTableIntegrationTest.java            |   1 -
 .../integration/JoinIntegrationTest.java        |   1 -
 .../KTableKTableJoinIntegrationTest.java        |   1 -
 12 files changed, 1043 insertions(+), 691 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/903548f1/streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java 
b/streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java
index e0312f9..ed3d488 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java
@@ -17,31 +17,43 @@
 
 package org.apache.kafka.streams;
 
-import java.util.Map;
-
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.internals.StreamThread;
+
+import java.util.Map;
 
+/**
+ * {@code KafkaClientSupplier} can be used to provide custom Kafka clients to 
a {@link KafkaStreams} instance.
+ *
+ * @see 
KafkaStreams#KafkaStreams(org.apache.kafka.streams.processor.TopologyBuilder, 
StreamsConfig, KafkaClientSupplier)
+ */
 public interface KafkaClientSupplier {
     /**
-     * Creates an instance of Producer which is used to produce records.
-     * @param config producer config which supplied by {@link StreamsConfig} 
given to {@link KafkaStreams}
-     * @return an instance of kafka Producer
+     * Create a {@link Producer} which is used to write records to sink topics.
+     *
+     * @param config {@link StreamsConfig#getProducerConfigs(String) producer 
config} which is supplied by the
+     *               {@link StreamsConfig} given to the {@link KafkaStreams} 
instance
+     * @return an instance of Kafka producer
      */
-    Producer<byte[], byte[]> getProducer(Map<String, Object> config);
+    Producer<byte[], byte[]> getProducer(final Map<String, Object> config);
 
     /**
-     * Creates an instance of Consumer which is used to consume records of 
source topics.
-     * @param config consumer config which supplied by {@link StreamsConfig} 
given to {@link KafkaStreams}
-     * @return an instance of kafka Consumer
+     * Create a {@link Consumer} which is used to read records of source 
topics.
+     *
+     * @param config {@link StreamsConfig#getConsumerConfigs(StreamThread, 
String, String) consumer config} which is
+     *               supplied by the {@link StreamsConfig} given to the {@link 
KafkaStreams} instance
+     * @return an instance of Kafka consumer
      */
-    Consumer<byte[], byte[]> getConsumer(Map<String, Object> config);
+    Consumer<byte[], byte[]> getConsumer(final Map<String, Object> config);
 
     /**
-     * Creates an instance of Consumer which is used to consume records of 
internal topics.
-     * @param config restore consumer config which supplied by {@link 
StreamsConfig} given to
-     * {@link KafkaStreams}
-     * @return an instance of kafka Consumer
+     * Create a {@link Consumer} which is used to read records to restore 
{@link StateStore}s.
+     *
+     * @param config {@link StreamsConfig#getRestoreConsumerConfigs(String) 
restore consumer config} which is supplied
+     *               by the {@link StreamsConfig} given to the {@link 
KafkaStreams}
+     * @return an instance of Kafka consumer
      */
-    Consumer<byte[], byte[]> getRestoreConsumer(Map<String, Object> config);
+    Consumer<byte[], byte[]> getRestoreConsumer(final Map<String, Object> 
config);
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/903548f1/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java 
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 27c8d22..ef0e72d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -17,6 +17,9 @@
 
 package org.apache.kafka.streams;
 
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.annotation.InterfaceStability;
@@ -28,6 +31,12 @@ import org.apache.kafka.common.metrics.MetricsReporter;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.TopologyBuilder;
@@ -67,43 +76,44 @@ import static org.apache.kafka.common.utils.Utils.getHost;
 import static org.apache.kafka.common.utils.Utils.getPort;
 
 /**
- * Kafka Streams allows for performing continuous computation on input coming 
from one or more input topics and
- * sends output to zero or more output topics.
+ * A Kafka client that allows for performing continuous computation on input 
coming from one or more input topics and
+ * sends output to zero, one, or more output topics.
  * <p>
- * The computational logic can be specified either by using the {@link 
TopologyBuilder} class to define the a DAG topology of
- * {@link org.apache.kafka.streams.processor.Processor}s or by using the 
{@link org.apache.kafka.streams.kstream.KStreamBuilder}
- * class which provides the high-level {@link 
org.apache.kafka.streams.kstream.KStream} DSL to define the transformation.
+ * The computational logic can be specified either by using the {@link 
TopologyBuilder} to define a DAG topology of
+ * {@link Processor}s or by using the {@link KStreamBuilder} which provides 
the high-level DSL to define transformations.
  * <p>
- * The {@link KafkaStreams} class manages the lifecycle of a Kafka Streams 
instance. One stream instance can contain one or
- * more threads specified in the configs for the processing work.
+ * One {@code KafkaStreams} instance can contain one or more threads specified 
in the configs for the processing work.
  * <p>
- * A {@link KafkaStreams} instance can co-ordinate with any other instances 
with the same application ID (whether in this same process, on other processes
- * on this machine, or on remote machines) as a single (possibly distributed) 
stream processing client. These instances will divide up the work
- * based on the assignment of the input topic partitions so that all 
partitions are being
- * consumed. If instances are added or failed, all instances will rebalance 
the partition assignment among themselves
- * to balance processing load.
- * <p>
- * Internally the {@link KafkaStreams} instance contains a normal {@link 
org.apache.kafka.clients.producer.KafkaProducer KafkaProducer}
- * and {@link org.apache.kafka.clients.consumer.KafkaConsumer KafkaConsumer} 
instance that is used for reading input and writing output.
+ * A {@code KafkaStreams} instance can co-ordinate with any other instances 
with the same
+ * {@link StreamsConfig#APPLICATION_ID_CONFIG application ID} (whether in the 
same process, on other processes on this
+ * machine, or on remote machines) as a single (possibly distributed) stream 
processing application.
+ * These instances will divide up the work based on the assignment of the 
input topic partitions so that all partitions
+ * are being consumed.
+ * If instances are added or fail, all (remaining) instances will rebalance 
the partition assignment among themselves
+ * to balance processing load and ensure that all input topic partitions are 
processed.
  * <p>
+ * Internally a {@link KafkaStreams} instance contains a normal {@link 
KafkaProducer} and {@link KafkaConsumer} instance
+ * that is used for reading input and writing output.
  * <p>
  * A simple example might look like this:
- * <pre>
- *    Map&lt;String, Object&gt; props = new HashMap&lt;&gt;();
- *    props.put(StreamsConfig.APPLICATION_ID_CONFIG, 
"my-stream-processing-application");
- *    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
- *    props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, 
Serdes.String().getClass().getName());
- *    props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, 
Serdes.String().getClass().getName());
- *    StreamsConfig config = new StreamsConfig(props);
+ * <pre>{@code
+ * Map&lt;String, Object&gt; props = new HashMap&lt;&gt;();
+ * props.put(StreamsConfig.APPLICATION_ID_CONFIG, 
"my-stream-processing-application");
+ * props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+ * props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+ * props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
+ * StreamsConfig config = new StreamsConfig(props);
+ *
+ * KStreamBuilder builder = new KStreamBuilder();
+ * builder.stream("my-input-topic").mapValues(value -&gt; 
value.length().toString()).to("my-output-topic");
  *
- *    KStreamBuilder builder = new KStreamBuilder();
- *    builder.stream("my-input-topic").mapValues(value -&gt; 
value.length().toString()).to("my-output-topic");
+ * KafkaStreams streams = new KafkaStreams(builder, config);
+ * streams.start();
+ * }</pre>
  *
- *    KafkaStreams streams = new KafkaStreams(builder, config);
- *    streams.start();
- * </pre>
+ * @see KStreamBuilder
+ * @see TopologyBuilder
  */
-
 @InterfaceStability.Unstable
 public class KafkaStreams {
 
@@ -163,15 +173,15 @@ public class KafkaStreams {
 
         private final Set<Integer> validTransitions = new HashSet<>();
 
-        State(final Integer...validTransitions) {
+        State(final Integer... validTransitions) {
             this.validTransitions.addAll(Arrays.asList(validTransitions));
         }
 
         public boolean isRunning() {
-            return this.equals(RUNNING) || this.equals(REBALANCING);
+            return equals(RUNNING) || equals(REBALANCING);
         }
         public boolean isCreatedOrRunning() {
-            return isRunning() || this.equals(CREATED);
+            return isRunning() || equals(CREATED);
         }
         public boolean isValidTransition(final State newState) {
             return validTransitions.contains(newState.ordinal());
@@ -179,33 +189,34 @@ public class KafkaStreams {
     }
     private volatile State state = KafkaStreams.State.CREATED;
     private StateListener stateListener = null;
-    private final StreamStateListener streamStateListener = new 
StreamStateListener();
+
 
     /**
-     * Listen to state change events
+     * Listen to {@link State} change events.
      */
     public interface StateListener {
 
         /**
-         * Called when state changes
-         * @param newState     current state
-         * @param oldState     previous state
+         * Called when state changes.
+         *
+         * @param newState new state
+         * @param oldState previous state
          */
         void onChange(final State newState, final State oldState);
     }
 
     /**
-     * An app can set {@link StateListener} so that the app is notified when 
state changes
-     * @param listener
+     * An app can set a single {@link StateListener} so that the app is 
notified when state changes.
+     * @param listener a new state listener
      */
     public void setStateListener(final StateListener listener) {
-        this.stateListener = listener;
+        stateListener = listener;
     }
 
-    private synchronized void setState(State newState) {
-        State oldState = state;
+    private synchronized void setState(final State newState) {
+        final State oldState = state;
         if (!state.isValidTransition(newState)) {
-            log.warn("Unexpected state transition from " + state + " to " + 
newState);
+            log.warn("Unexpected state transition from {} to {}.", oldState, 
newState);
         }
         state = newState;
         if (stateListener != null) {
@@ -213,31 +224,35 @@ public class KafkaStreams {
         }
     }
 
-
     /**
-     * @return The state this instance is in
+     * Return the current {@link State} of this {@code KafkaStreams} instance.
+     *
+     * @return the currnt state of this Kafka Streams instance
      */
     public synchronized State state() {
         return state;
     }
 
     /**
-     * Get read-only handle on global metrics registry
+     * Get read-only handle on global metrics registry.
+     *
      * @return Map of all metrics.
      */
     public Map<MetricName, ? extends Metric> metrics() {
-        return Collections.unmodifiableMap(this.metrics.metrics());
+        return Collections.unmodifiableMap(metrics.metrics());
     }
 
     private class StreamStateListener implements StreamThread.StateListener {
         @Override
-        public synchronized void onChange(final StreamThread thread, final 
StreamThread.State newState, final StreamThread.State oldState) {
+        public synchronized void onChange(final StreamThread thread,
+                                          final StreamThread.State newState,
+                                          final StreamThread.State oldState) {
             threadState.put(thread.getId(), newState);
             if (newState == StreamThread.State.PARTITIONS_REVOKED ||
                 newState == StreamThread.State.ASSIGNING_PARTITIONS) {
                 setState(State.REBALANCING);
             } else if (newState == StreamThread.State.RUNNING) {
-                for (StreamThread.State state : threadState.values()) {
+                for (final StreamThread.State state : threadState.values()) {
                     if (state != StreamThread.State.RUNNING) {
                         return;
                     }
@@ -246,35 +261,38 @@ public class KafkaStreams {
             }
         }
     }
+
     /**
-     * Construct the stream instance.
+     * Create a {@code KafkaStreams} instance.
      *
      * @param builder the processor topology builder specifying the 
computational logic
-     * @param props   properties for the {@link StreamsConfig}
+     * @param props   properties for {@link StreamsConfig}
      */
     public KafkaStreams(final TopologyBuilder builder, final Properties props) 
{
         this(builder, new StreamsConfig(props), new 
DefaultKafkaClientSupplier());
     }
 
     /**
-     * Construct the stream instance.
+     * Create a {@code KafkaStreams} instance.
      *
      * @param builder the processor topology builder specifying the 
computational logic
-     * @param config  the stream configs
+     * @param config  the Kafka Streams configuration
      */
     public KafkaStreams(final TopologyBuilder builder, final StreamsConfig 
config) {
         this(builder, config, new DefaultKafkaClientSupplier());
     }
 
     /**
-     * Construct the stream instance.
+     * Create a {@code KafkaStreams} instance.
      *
      * @param builder        the processor topology builder specifying the 
computational logic
-     * @param config         the stream configs
-     * @param clientSupplier the kafka clients supplier which provides 
underlying producer and consumer clients
-     *                       for this {@link KafkaStreams} instance
+     * @param config         the Kafka Streams configuration
+     * @param clientSupplier the Kafka clients supplier which provides 
underlying producer and consumer clients
+     *                       for the new {@code KafkaStreams} instance
      */
-    public KafkaStreams(final TopologyBuilder builder, final StreamsConfig 
config, final KafkaClientSupplier clientSupplier) {
+    public KafkaStreams(final TopologyBuilder builder,
+                        final StreamsConfig config,
+                        final KafkaClientSupplier clientSupplier) {
         // create the metrics
         final Time time = Time.SYSTEM;
 
@@ -338,7 +356,7 @@ public class KafkaStreams {
                                           time,
                                           streamsMetadataState,
                                           cacheSizeBytes);
-            threads[i].setStateListener(streamStateListener);
+            threads[i].setStateListener(new StreamStateListener());
             threadState.put(threads[i].getId(), threads[i].state());
             storeProviders.add(new StreamThreadStateStoreProvider(threads[i]));
         }
@@ -360,7 +378,15 @@ public class KafkaStreams {
         return new HostInfo(host, port);
     }
 
-    private void checkBrokerVersionCompatibility() {
+    /**
+     * Check if the used brokers have version 0.10.1.x or higher.
+     * <p>
+     * Note, for <em>pre</em> 0.10.x brokers the broker version cannot be 
checked and the client will hang and retry
+     * until it {@link StreamsConfig#REQUEST_TIMEOUT_MS_CONFIG times out}.
+     *
+     * @throws StreamsException if brokers have version 0.10.0.x
+     */
+    private void checkBrokerVersionCompatibility() throws StreamsException {
         final StreamsKafkaClient client = new StreamsKafkaClient(config);
 
         client.checkBrokerCompatibility();
@@ -374,12 +400,17 @@ public class KafkaStreams {
     }
 
     /**
-     * Start the stream instance by starting all its threads.
-     *
+     * Start the {@code KafkaStreams} instance by starting all its threads.
+     * <p>
+     * Note, for brokers with version {@code 0.9.x} or lower, the broker 
version cannot be checked.
+     * There will be no error and the client will hang and retry to verify the 
broker version until it
+     * {@link StreamsConfig#REQUEST_TIMEOUT_MS_CONFIG times out}.
+
      * @throws IllegalStateException if process was already started
+     * @throws StreamsException if the Kafka brokers have version 0.10.0.x
      */
-    public synchronized void start() {
-        log.debug("Starting Kafka Stream process");
+    public synchronized void start() throws IllegalStateException, 
StreamsException {
+        log.debug("Starting Kafka Stream process.");
 
         if (state == KafkaStreams.State.CREATED) {
             checkBrokerVersionCompatibility();
@@ -400,9 +431,7 @@ public class KafkaStreams {
     }
 
     /**
-     * Shutdown this stream instance by signaling all the threads to stop,
-     * and then wait for them to join.
-     *
+     * Shutdown this {@code KafkaStreams} instance by signaling all the 
threads to stop, and then wait for them to join.
      * This will block until all threads have stopped.
      */
     public void close() {
@@ -410,17 +439,17 @@ public class KafkaStreams {
     }
 
     /**
-     * Shutdown this stream instance by signaling all the threads to stop,
-     * and then wait up to the timeout for the threads to join.
-     *
-     * A timeout of 0 means to wait forever
+     * Shutdown this {@code KafkaStreams} by signaling all the threads to 
stop, and then wait up to the timeout for the
+     * threads to join.
+     * A {@code timeout} of 0 means to wait forever.
      *
-     * @param timeout   how long to wait for {@link StreamThread}s to shutdown
-     * @param timeUnit  unit of time used for timeout
-     * @return true if all threads were successfully stopped
+     * @param timeout  how long to wait for the threads to shutdown
+     * @param timeUnit unit of time used for timeout
+     * @return {@code true} if all threads were successfully 
stopped&mdash;{@code false} if the timeout was reached
+     * before all threads stopped
      */
     public synchronized boolean close(final long timeout, final TimeUnit 
timeUnit) {
-        log.debug("Stopping Kafka Stream process");
+        log.debug("Stopping Kafka Stream process.");
         if (state.isCreatedOrRunning()) {
             setState(KafkaStreams.State.PENDING_SHUTDOWN);
             // save the current thread so that if it is a stream thread
@@ -428,42 +457,42 @@ public class KafkaStreams {
             final Thread shutdown = new Thread(new Runnable() {
                 @Override
                 public void run() {
-                        // signal the threads to stop and wait
-                        for (final StreamThread thread : threads) {
-                            // avoid deadlocks by stopping any further state 
reports
-                            // from the thread since we're shutting down
-                            thread.setStateListener(null);
-                            thread.close();
-                        }
-                        if (globalStreamThread != null) {
-                            globalStreamThread.close();
-                            if (!globalStreamThread.stillRunning()) {
-                                try {
-                                    globalStreamThread.join();
-                                } catch (InterruptedException e) {
-                                    Thread.interrupted();
-                                }
-                            }
-                        }
-                        for (final StreamThread thread : threads) {
+                    // signal the threads to stop and wait
+                    for (final StreamThread thread : threads) {
+                        // avoid deadlocks by stopping any further state 
reports
+                        // from the thread since we're shutting down
+                        thread.setStateListener(null);
+                        thread.close();
+                    }
+                    if (globalStreamThread != null) {
+                        globalStreamThread.close();
+                        if (!globalStreamThread.stillRunning()) {
                             try {
-                                if (!thread.stillRunning()) {
-                                    thread.join();
-                                }
-                            } catch (final InterruptedException ex) {
+                                globalStreamThread.join();
+                            } catch (final InterruptedException e) {
                                 Thread.interrupted();
                             }
                         }
-
-                        metrics.close();
-                        log.info("Stopped Kafka Streams process");
                     }
+                    for (final StreamThread thread : threads) {
+                        try {
+                            if (!thread.stillRunning()) {
+                                thread.join();
+                            }
+                        } catch (final InterruptedException ex) {
+                            Thread.interrupted();
+                        }
+                    }
+
+                    metrics.close();
+                    log.info("Stopped Kafka Streams process.");
+                }
             }, "kafka-streams-close-thread");
             shutdown.setDaemon(true);
             shutdown.start();
             try {
                 shutdown.join(TimeUnit.MILLISECONDS.convert(timeout, 
timeUnit));
-            } catch (InterruptedException e) {
+            } catch (final InterruptedException e) {
                 Thread.interrupted();
             }
             setState(KafkaStreams.State.NOT_RUNNING);
@@ -473,9 +502,10 @@ public class KafkaStreams {
     }
 
     /**
-     * Produces a string representation containing useful information about 
Kafka Streams
-     * Such as thread IDs, task IDs and a representation of the topology. This 
is useful
-     * in debugging scenarios.
+     * Produce a string representation containing useful information about 
this {@code KafkaStream} instance such as
+     * thread IDs, task IDs, and a representation of the topology DAG 
including {@link StateStore}s (cf.
+     * {@link TopologyBuilder} and {@link KStreamBuilder}).
+     *
      * @return A string representation of the Kafka Streams instance.
      */
     @Override
@@ -484,13 +514,19 @@ public class KafkaStreams {
     }
 
     /**
-     * Produces a string representation containing useful information about 
Kafka Streams
-     * such as thread IDs, task IDs and a representation of the topology 
starting with the given indent. This is useful
-     * in debugging scenarios.
+     * Produce a string representation containing useful information about 
this {@code KafkaStream} instance such as
+     * thread IDs, task IDs, and a representation of the topology DAG 
including {@link StateStore}s (cf.
+     * {@link TopologyBuilder} and {@link KStreamBuilder}).
+     *
+     * @param indent the top-level indent for each line
      * @return A string representation of the Kafka Streams instance.
      */
     public String toString(final String indent) {
-        final StringBuilder sb = new StringBuilder(indent + "KafkaStreams 
processID:" + processId + "\n");
+        final StringBuilder sb = new StringBuilder()
+            .append(indent)
+            .append("KafkaStreams processID: ")
+            .append(processId)
+            .append("\n");
         for (final StreamThread thread : threads) {
             sb.append(thread.toString(indent + "\t"));
         }
@@ -500,11 +536,16 @@ public class KafkaStreams {
     }
 
     /**
-     * Cleans up local state store directory ({@code state.dir}), by deleting 
all data with regard to the application-id.
+     * Do a clean up of the local {@link StateStore} directory ({@link 
StreamsConfig#STATE_DIR_CONFIG}) by deleting all
+     * data with regard to the {@link StreamsConfig#APPLICATION_ID_CONFIG 
application ID}.
+     * <p>
+     * May only be called either before this {@code KafkaStreams} instance is 
{@link KafkaStreams#start() started} or
+     * after the instance is {@link KafkaStreams#close() closed}.
      * <p>
-     * May only be called either before instance is started or after instance 
is closed.
+     * Calling this method triggers a restore of local {@link StateStore}s on 
the next {@link KafkaStreams#start()
+     * application start}.
      *
-     * @throws IllegalStateException if instance is currently running
+     * @throws IllegalStateException if the instance is currently running
      */
     public void cleanUp() {
         if (state.isRunning()) {
@@ -515,7 +556,7 @@ public class KafkaStreams {
         final String stateDir = 
config.getString(StreamsConfig.STATE_DIR_CONFIG);
 
         final String localApplicationDir = stateDir + File.separator + appId;
-        log.debug("Removing local Kafka Streams application data in {} for 
application {}",
+        log.debug("Removing local Kafka Streams application data in {} for 
application {}.",
             localApplicationDir,
             appId);
 
@@ -524,9 +565,10 @@ public class KafkaStreams {
     }
 
     /**
-     * Sets the handler invoked when a stream thread abruptly terminates due 
to an uncaught exception.
+     * Set the handler invoked when a {@link 
StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread} abruptly
+     * terminates due to an uncaught exception.
      *
-     * @param eh the object to use as this thread's uncaught exception 
handler. If null then this thread has no explicit handler.
+     * @param eh the uncaught exception handler for all internal threads; 
{@code null} deletes the current handler
      */
     public void setUncaughtExceptionHandler(final 
Thread.UncaughtExceptionHandler eh) {
         for (final StreamThread thread : threads) {
@@ -538,25 +580,34 @@ public class KafkaStreams {
         }
     }
 
-
     /**
-     * Find all of the instances of {@link StreamsMetadata} in the {@link 
KafkaStreams} application that this instance belongs to
-     *
+     * Find all currently running {@code KafkaStreams} instances (potentially 
remotely) that use the same
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG application ID} as this 
instance (i.e., all instances that belong to
+     * the same Kafka Streams application) and return {@link StreamsMetadata} 
for each discovered instance.
+     * <p>
      * Note: this is a point in time view and it may change due to partition 
reassignment.
-     * @return collection containing all instances of {@link StreamsMetadata} 
in the {@link KafkaStreams} application that this instance belongs to
+     *
+     * @return {@link StreamsMetadata} for each {@code KafkaStreams} instances 
of this application
      */
     public Collection<StreamsMetadata> allMetadata() {
         validateIsRunning();
         return streamsMetadataState.getAllMetadata();
     }
 
-
     /**
-     * Find instances of {@link StreamsMetadata} that contains the given 
storeName
-     *
+     * Find all currently running {@code KafkaStreams} instances (potentially 
remotely) that
+     * <ul>
+     *   <li>use the same {@link StreamsConfig#APPLICATION_ID_CONFIG 
application ID} as this instance (i.e., all
+     *       instances that belong to the same Kafka Streams application)</li>
+     *   <li>and that contain a {@link StateStore} with the given {@code 
storeName}</li>
+     * </ul>
+     * and return {@link StreamsMetadata} for each discovered instance.
+     * <p>
      * Note: this is a point in time view and it may change due to partition 
reassignment.
-     * @param storeName the storeName to find metadata for
-     * @return  A collection containing instances of {@link StreamsMetadata} 
that have the provided storeName
+     *
+     * @param storeName the {@code storeName} to find metadata for
+     * @return {@link StreamsMetadata} for each {@code KafkaStreams} instances 
with the provide {@code storeName} of
+     * this application
      */
     public Collection<StreamsMetadata> allMetadataForStore(final String 
storeName) {
         validateIsRunning();
@@ -564,22 +615,35 @@ public class KafkaStreams {
     }
 
     /**
-     * Find the {@link StreamsMetadata} instance that contains the given 
storeName
-     * and the corresponding hosted store instance contains the given key. 
This will use
-     * the {@link 
org.apache.kafka.streams.processor.internals.DefaultStreamPartitioner} to
-     * locate the partition. If a custom partitioner has been used please use
-     * {@link KafkaStreams#metadataForKey(String, Object, StreamPartitioner)}
-     *
-     * Note: the key may not exist in the {@link 
org.apache.kafka.streams.processor.StateStore},
-     * this method provides a way of finding which host it would exist on.
+     * Find the currently running {@code KafkaStreams} instance (potentially 
remotely) that
+     * <ul>
+     *   <li>use the same {@link StreamsConfig#APPLICATION_ID_CONFIG 
application ID} as this instance (i.e., all
+     *       instances that belong to the same Kafka Streams application)</li>
+     *   <li>and that contain a {@link StateStore} with the given {@code 
storeName}</li>
+     *   <li>and the {@link StateStore} contains the given {@code key}</li>
+     * </ul>
+     * and return {@link StreamsMetadata} for it.
+     * <p>
+     * This will use the default Kafka Streams partitioner to locate the 
partition.
+     * If a {@link StreamPartitioner custom partitioner} has been
+     * {@link ProducerConfig#PARTITIONER_CLASS_CONFIG configured} via {@link 
StreamsConfig},
+     * {@link KStream#through(StreamPartitioner, String)}, or {@link 
KTable#through(StreamPartitioner, String, String)},
+     * or if the original {@link KTable}'s input {@link 
KStreamBuilder#table(String, String) topic} is partitioned
+     * differently, please use {@link KafkaStreams#metadataForKey(String, 
Object, StreamPartitioner)}.
+     * <p>
+     * Note:
+     * <ul>
+     *   <li>this is a point in time view and it may change due to partition 
reassignment</li>
+     *   <li>the key may not exist in the {@link StateStore}; this method 
provides a way of finding which host it
+     *       <em>would</em> exist on</li>
+     * </ul>
      *
-     * Note: this is a point in time view and it may change due to partition 
reassignment.
-     * @param storeName         Name of the store
-     * @param key               Key to use to for partition
-     * @param keySerializer     Serializer for the key
-     * @param <K>               key type
-     * @return  The {@link StreamsMetadata} for the storeName and key or 
{@link StreamsMetadata#NOT_AVAILABLE}
-     * if streams is (re-)initializing
+     * @param storeName     the {@code storeName} to find metadata for
+     * @param key           the key to find metadata for
+     * @param keySerializer serializer for the key
+     * @param <K>           key type
+     * @return {@link StreamsMetadata} for the {@code KafkaStreams} instance 
with the provide {@code storeName} and
+     * {@code key} of this application or {@link 
StreamsMetadata#NOT_AVAILABLE} if Kafka Streams is (re-)initializing
      */
     public <K> StreamsMetadata metadataForKey(final String storeName,
                                               final K key,
@@ -589,19 +653,28 @@ public class KafkaStreams {
     }
 
     /**
-     * Find the {@link StreamsMetadata} instance that contains the given 
storeName
-     * and the corresponding hosted store instance contains the given key
-     *
-     * Note: the key may not exist in the {@link 
org.apache.kafka.streams.processor.StateStore},
-     * this method provides a way of finding which host it would exist on.
+     * Find the currently running {@code KafkaStreams} instance (potentially 
remotely) that
+     * <ul>
+     *   <li>use the same {@link StreamsConfig#APPLICATION_ID_CONFIG 
application ID} as this instance (i.e., all
+     *       instances that belong to the same Kafka Streams application)</li>
+     *   <li>and that contain a {@link StateStore} with the given {@code 
storeName}</li>
+     *   <li>and the {@link StateStore} contains the given {@code key}</li>
+     * </ul>
+     * and return {@link StreamsMetadata} for it.
+     * <p>
+     * Note:
+     * <ul>
+     *   <li>this is a point in time view and it may change due to partition 
reassignment</li>
+     *   <li>the key may not exist in the {@link StateStore}; this method 
provides a way of finding which host it
+     *       <em>would</em> exist on</li>
+     * </ul>
      *
-     * Note: this is a point in time view and it may change due to partition 
reassignment.
-     * @param storeName         Name of the store
-     * @param key               Key to use to for partition
-     * @param partitioner       Partitioner for the store
-     * @param <K>               key type
-     * @return  The {@link StreamsMetadata} for the storeName and key or 
{@link StreamsMetadata#NOT_AVAILABLE}
-     * if streams is (re-)initializing
+     * @param storeName   the {@code storeName} to find metadata for
+     * @param key         the key to find metadata for
+     * @param partitioner the partitioner to be use to locate the host for the 
key
+     * @param <K>         key type
+     * @return {@link StreamsMetadata} for the {@code KafkaStreams} instance 
with the provide {@code storeName} and
+     * {@code key} of this application or {@link 
StreamsMetadata#NOT_AVAILABLE} if Kafka Streams is (re-)initializing
      */
     public <K> StreamsMetadata metadataForKey(final String storeName,
                                               final K key,
@@ -610,17 +683,17 @@ public class KafkaStreams {
         return streamsMetadataState.getMetadataWithKey(storeName, key, 
partitioner);
     }
 
-
     /**
-     * Get a facade wrapping the {@link 
org.apache.kafka.streams.processor.StateStore} instances
-     * with the provided storeName and accepted by {@link 
QueryableStoreType#accepts(StateStore)}.
-     * The returned object can be used to query the {@link 
org.apache.kafka.streams.processor.StateStore} instances
-     * @param storeName             name of the store to find
-     * @param queryableStoreType    accept only stores that are accepted by 
{@link QueryableStoreType#accepts(StateStore)}
-     * @param <T>                   return type
-     * @return  A facade wrapping the {@link 
org.apache.kafka.streams.processor.StateStore} instances
-     * @throws org.apache.kafka.streams.errors.InvalidStateStoreException if 
the streams are (re-)initializing or
-     * a store with storeName and queryableStoreType doesnt' exist.
+     * Get a facade wrapping the local {@link StateStore} instances with the 
provided {@code storeName} if the Store's
+     * type is accepted by the provided {@link 
QueryableStoreType#accepts(StateStore) queryableStoreType}.
+     * The returned object can be used to query the {@link StateStore} 
instances.
+     *
+     * @param storeName           name of the store to find
+     * @param queryableStoreType  accept only stores that are accepted by 
{@link QueryableStoreType#accepts(StateStore)}
+     * @param <T>                 return type
+     * @return A facade wrapping the local {@link StateStore} instances
+     * @throws InvalidStateStoreException if Kafka Streams is 
(re-)initializing or a store with {@code storeName} and
+     * {@code queryableStoreType} doesnt' exist
      */
     public <T> T store(final String storeName, final QueryableStoreType<T> 
queryableStoreType) {
         validateIsRunning();
@@ -629,7 +702,7 @@ public class KafkaStreams {
 
     private void validateIsRunning() {
         if (!state.isRunning()) {
-            throw new IllegalStateException("KafkaStreams is not running. 
State is " + state);
+            throw new IllegalStateException("KafkaStreams is not running. 
State is " + state + ".");
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/903548f1/streams/src/main/java/org/apache/kafka/streams/KeyValue.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KeyValue.java 
b/streams/src/main/java/org/apache/kafka/streams/KeyValue.java
index 64b38cd..0c1d2af 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KeyValue.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KeyValue.java
@@ -21,8 +21,7 @@ import java.util.Objects;
 
 /**
  * A key-value pair defined for a single Kafka Streams record.
- * If the record comes directly from a Kafka topic then its
- * key / value are defined as the message key / value.
+ * If the record comes directly from a Kafka topic then its key/value are 
defined as the message key/value.
  *
  * @param <K> Key type
  * @param <V> Value type
@@ -37,10 +36,10 @@ public class KeyValue<K, V> {
     /**
      * Create a new key-value pair.
      *
-     * @param key    the key
-     * @param value  the value
+     * @param key   the key
+     * @param value the value
      */
-    public KeyValue(K key, V value) {
+    public KeyValue(final K key, final V value) {
         this.key = key;
         this.value = value;
     }
@@ -48,22 +47,23 @@ public class KeyValue<K, V> {
     /**
      * Create a new key-value pair.
      *
-     * @param key    the key
-     * @param value  the value
-     * @param <K>    the type of the key
-     * @param <V>    the type of the value
-     * @return       a new key value pair
+     * @param key   the key
+     * @param value the value
+     * @param <K>   the type of the key
+     * @param <V>   the type of the value
+     * @return a new key-value pair
      */
-    public static <K, V> KeyValue<K, V> pair(K key, V value) {
+    public static <K, V> KeyValue<K, V> pair(final K key, final V value) {
         return new KeyValue<>(key, value);
     }
 
+    @Override
     public String toString() {
         return "KeyValue(" + key + ", " + value + ")";
     }
 
     @Override
-    public boolean equals(Object obj) {
+    public boolean equals(final Object obj) {
         if (this == obj)
             return true;
 
@@ -71,9 +71,9 @@ public class KeyValue<K, V> {
             return false;
         }
 
-        KeyValue other = (KeyValue) obj;
-        return (this.key == null ? other.key == null : 
this.key.equals(other.key))
-                && (this.value == null ? other.value == null : 
this.value.equals(other.value));
+        final KeyValue other = (KeyValue) obj;
+        return (key == null ? other.key == null : key.equals(other.key))
+                && (value == null ? other.value == null : 
value.equals(other.value));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/903548f1/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java 
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index d7d6566..57db027 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -19,6 +19,8 @@ package org.apache.kafka.streams;
 
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
@@ -29,8 +31,8 @@ import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.errors.StreamsException;
-import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
 import org.apache.kafka.streams.processor.DefaultPartitionGrouper;
+import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
 import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor;
 import org.apache.kafka.streams.processor.internals.StreamThread;
 
@@ -43,303 +45,345 @@ import static 
org.apache.kafka.common.config.ConfigDef.Range.atLeast;
 import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
 
 /**
- * Configuration for Kafka Streams. Documentation for these configurations can 
be found in the <a
- * href="http://kafka.apache.org/documentation.html#streamsconfigs";>Kafka 
documentation</a>
+ * Configuration for a {@link KafkaStreams} instance.
+ * Can also be use to configure the Kafka Streams internal {@link 
KafkaConsumer} and {@link KafkaProducer}.
+ * To avoid consumer/producer property conflicts, you should prefix those 
properties using
+ * {@link #consumerPrefix(String)} and {@link #producerPrefix(String)}, 
respectively.
+ * <p>
+ * Example:
+ * <pre>{@code
+ * // potentially wrong: sets "metadata.max.age.ms" to 1 minute for producer 
AND consumer
+ * Properties streamsProperties = new Properties();
+ * streamsProperties.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, 60000);
+ * // or
+ * streamsProperties.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, 60000);
+ *
+ * // suggested:
+ * Properties streamsProperties = new Properties();
+ * // sets "metadata.max.age.ms" to 1 minute for consumer only
+ * 
streamsProperties.put(StreamsConfig.consumerPrefix(ConsumerConfig.METADATA_MAX_AGE_CONFIG),
 60000);
+ * // sets "metadata.max.age.ms" to 1 minute for producer only
+ * 
streamsProperties.put(StreamsConfig.producerPrefix(ProducerConfig.METADATA_MAX_AGE_CONFIG),
 60000);
+ *
+ * StreamsConfig streamsConfig = new StreamsConfig(streamsProperties);
+ * }</pre>
+ * Kafka Streams required to set at least properties {@link 
#APPLICATION_ID_CONFIG "application.id"} and
+ * {@link #BOOTSTRAP_SERVERS_CONFIG "bootstrap.servers"}.
+ * Furthermore, it is not allowed to enable {@link 
ConsumerConfig#ENABLE_AUTO_COMMIT_CONFIG "enable.auto.commit"} that
+ * is disabled by Kafka Streams by default.
+ *
+ * @see 
KafkaStreams#KafkaStreams(org.apache.kafka.streams.processor.TopologyBuilder, 
StreamsConfig)
+ * @see ConsumerConfig
+ * @see ProducerConfig
  */
 public class StreamsConfig extends AbstractConfig {
 
     private static final ConfigDef CONFIG;
 
-    // Prefix used to isolate consumer configs from producer configs.
+    /**
+     * Prefix used to isolate {@link KafkaConsumer consumer} configs from 
{@link KafkaProducer producer} configs.
+     * It is recommended to use {@link #consumerPrefix(String)} to add this 
prefix to {@link ConsumerConfig consumer
+     * properties}.
+     */
     public static final String CONSUMER_PREFIX = "consumer.";
 
     // Prefix used to isolate producer configs from consumer configs.
+    /**
+     * Prefix used to isolate {@link KafkaProducer producer} configs from 
{@link KafkaConsumer consumer} configs.
+     * It is recommended to use {@link #producerPrefix(String)} to add this 
prefix to {@link ProducerConfig producer
+     * properties}.
+     */
     public static final String PRODUCER_PREFIX = "producer.";
 
-    /** <code>state.dir</code> */
+    /** {@code state.dir} */
     public static final String STATE_DIR_CONFIG = "state.dir";
     private static final String STATE_DIR_DOC = "Directory location for state 
store.";
 
-    /** <code>zookeeper.connect<code/> */
+    /**
+     * {@code zookeeper.connect}
+     * @deprecated Kakfa Streams does not use Zookeeper anymore and this 
parameter will be ignored.
+     */
+    @Deprecated
     public static final String ZOOKEEPER_CONNECT_CONFIG = "zookeeper.connect";
     private static final String ZOOKEEPER_CONNECT_DOC = "Zookeeper connect 
string for Kafka topics management.";
 
-    /** <code>commit.interval.ms</code> */
+    /** {@code commit.interval.ms} */
     public static final String COMMIT_INTERVAL_MS_CONFIG = 
"commit.interval.ms";
     private static final String COMMIT_INTERVAL_MS_DOC = "The frequency with 
which to save the position of the processor.";
 
-    /** <code>poll.ms</code> */
+    /** {@code poll.ms} */
     public static final String POLL_MS_CONFIG = "poll.ms";
     private static final String POLL_MS_DOC = "The amount of time in 
milliseconds to block waiting for input.";
 
-    /** <code>num.stream.threads</code> */
+    /** {@code num.stream.threads} */
     public static final String NUM_STREAM_THREADS_CONFIG = 
"num.stream.threads";
     private static final String NUM_STREAM_THREADS_DOC = "The number of 
threads to execute stream processing.";
 
-    /** <code>num.standby.replicas</code> */
+    /** {@code num.standby.replicas} */
     public static final String NUM_STANDBY_REPLICAS_CONFIG = 
"num.standby.replicas";
     private static final String NUM_STANDBY_REPLICAS_DOC = "The number of 
standby replicas for each task.";
 
-    /** <code>buffered.records.per.partition</code> */
+    /** {@code buffered.records.per.partition} */
     public static final String BUFFERED_RECORDS_PER_PARTITION_CONFIG = 
"buffered.records.per.partition";
     private static final String BUFFERED_RECORDS_PER_PARTITION_DOC = "The 
maximum number of records to buffer per partition.";
 
-    /** <code>state.cleanup.delay</code> */
+    /** {@code state.cleanup.delay} */
     public static final String STATE_CLEANUP_DELAY_MS_CONFIG = 
"state.cleanup.delay.ms";
     private static final String STATE_CLEANUP_DELAY_MS_DOC = "The amount of 
time in milliseconds to wait before deleting state when a partition has 
migrated.";
 
-    /** <code>timestamp.extractor</code> */
+    /** {@code timestamp.extractor} */
     public static final String TIMESTAMP_EXTRACTOR_CLASS_CONFIG = 
"timestamp.extractor";
     private static final String TIMESTAMP_EXTRACTOR_CLASS_DOC = "Timestamp 
extractor class that implements the <code>TimestampExtractor</code> interface.";
 
-    /** <code>partition.grouper</code> */
+    /** {@code partition.grouper} */
     public static final String PARTITION_GROUPER_CLASS_CONFIG = 
"partition.grouper";
     private static final String PARTITION_GROUPER_CLASS_DOC = "Partition 
grouper class that implements the <code>PartitionGrouper</code> interface.";
 
-    /** <code>application.id</code> */
+    /** {@code application.id} */
     public static final String APPLICATION_ID_CONFIG = "application.id";
-    public static final String APPLICATION_ID_DOC = "An identifier for the 
stream processing application. Must be unique within the Kafka cluster. It is 
used as 1) the default client-id prefix, 2) the group-id for membership 
management, 3) the changelog topic prefix.";
+    private static final String APPLICATION_ID_DOC = "An identifier for the 
stream processing application. Must be unique within the Kafka cluster. It is 
used as 1) the default client-id prefix, 2) the group-id for membership 
management, 3) the changelog topic prefix.";
 
-    /** <code>replication.factor</code> */
+    /** {@code replication.factor} */
     public static final String REPLICATION_FACTOR_CONFIG = 
"replication.factor";
-    public static final String REPLICATION_FACTOR_DOC = "The replication 
factor for change log topics and repartition topics created by the stream 
processing application.";
+    private static final String REPLICATION_FACTOR_DOC = "The replication 
factor for change log topics and repartition topics created by the stream 
processing application.";
 
-    /** <code>key.serde</code> */
+    /** {@code key.serde} */
     public static final String KEY_SERDE_CLASS_CONFIG = "key.serde";
-    public static final String KEY_SERDE_CLASS_DOC = "Serializer / 
deserializer class for key that implements the <code>Serde</code> interface.";
+    private static final String KEY_SERDE_CLASS_DOC = "Serializer / 
deserializer class for key that implements the <code>Serde</code> interface.";
 
-    /** <code>value.serde</code> */
+    /** {@code value.serde} */
     public static final String VALUE_SERDE_CLASS_CONFIG = "value.serde";
-    public static final String VALUE_SERDE_CLASS_DOC = "Serializer / 
deserializer class for value that implements the <code>Serde</code> interface.";
+    private static final String VALUE_SERDE_CLASS_DOC = "Serializer / 
deserializer class for value that implements the <code>Serde</code> interface.";
 
-    /**<code>user.endpoint</code> */
+    /**{@code user.endpoint} */
     public static final String APPLICATION_SERVER_CONFIG = 
"application.server";
-    public static final String APPLICATION_SERVER_DOC = "A host:port pair 
pointing to an embedded user defined endpoint that can be used for discovering 
the locations of state stores within a single KafkaStreams application";
+    private static final String APPLICATION_SERVER_DOC = "A host:port pair 
pointing to an embedded user defined endpoint that can be used for discovering 
the locations of state stores within a single KafkaStreams application";
 
-    /** <code>metrics.sample.window.ms</code> */
+    /** {@code metrics.sample.window.ms} */
     public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = 
CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG;
 
-    /** <code>metrics.num.samples</code> */
+    /** {@code metrics.num.samples} */
     public static final String METRICS_NUM_SAMPLES_CONFIG = 
CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG;
 
-    /** <code>metrics.record.level</code> */
+    /** {@code metrics.record.level} */
     public static final String METRICS_RECORDING_LEVEL_CONFIG = 
CommonClientConfigs.METRICS_RECORDING_LEVEL_CONFIG;
 
-    /** <code>metric.reporters</code> */
+    /** {@code metric.reporters} */
     public static final String METRIC_REPORTER_CLASSES_CONFIG = 
CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG;
 
-    /** <code>bootstrap.servers</code> */
+    /** {@code bootstrap.servers} */
     public static final String BOOTSTRAP_SERVERS_CONFIG = 
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
 
-    /** <code>client.id</code> */
+    /** {@code client.id} */
     public static final String CLIENT_ID_CONFIG = 
CommonClientConfigs.CLIENT_ID_CONFIG;
 
-    /** <code>rocksdb.config.setter</code> */
+    /** {@code rocksdb.config.setter} */
     public static final String ROCKSDB_CONFIG_SETTER_CLASS_CONFIG = 
"rocksdb.config.setter";
-    public static final String ROCKSDB_CONFIG_SETTER_CLASS_DOC = "A Rocks DB 
config setter class that implements the <code>RocksDBConfigSetter</code> 
interface";
+    private static final String ROCKSDB_CONFIG_SETTER_CLASS_DOC = "A Rocks DB 
config setter class that implements the <code>RocksDBConfigSetter</code> 
interface";
 
-    /** <code>windowstore.changelog.additional.retention.ms</code> */
+    /** {@code windowstore.changelog.additional.retention.ms} */
     public static final String 
WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG = 
"windowstore.changelog.additional.retention.ms";
-    public static final String 
WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_DOC = "Added to a windows 
maintainMs to ensure data is not deleted from the log prematurely. Allows for 
clock drift. Default is 1 day";
+    private static final String 
WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_DOC = "Added to a windows 
maintainMs to ensure data is not deleted from the log prematurely. Allows for 
clock drift. Default is 1 day";
 
-    /** <code>cache.max.bytes.buffering</code> */
+    /** {@code cache.max.bytes.buffering} */
     public static final String CACHE_MAX_BYTES_BUFFERING_CONFIG = 
"cache.max.bytes.buffering";
-    public static final String CACHE_MAX_BYTES_BUFFERING_DOC = "Maximum number 
of memory bytes to be used for buffering across all threads";
+    private static final String CACHE_MAX_BYTES_BUFFERING_DOC = "Maximum 
number of memory bytes to be used for buffering across all threads";
 
     public static final String SECURITY_PROTOCOL_CONFIG = 
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG;
-    public static final String SECURITY_PROTOCOL_DOC = 
CommonClientConfigs.SECURITY_PROTOCOL_DOC;
+    private static final String SECURITY_PROTOCOL_DOC = 
CommonClientConfigs.SECURITY_PROTOCOL_DOC;
     public static final String DEFAULT_SECURITY_PROTOCOL = 
CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL;
 
     public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = 
CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG;
-    public static final String CONNECTIONS_MAX_IDLE_MS_DOC = 
CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC;
+    private static final String CONNECTIONS_MAX_IDLE_MS_DOC = 
CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC;
 
     public static final String RETRY_BACKOFF_MS_CONFIG = 
CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG;
-    public static final String RETRY_BACKOFF_MS_DOC = 
CommonClientConfigs.RETRY_BACKOFF_MS_DOC;
+    private static final String RETRY_BACKOFF_MS_DOC = 
CommonClientConfigs.RETRY_BACKOFF_MS_DOC;
 
     public static final String METADATA_MAX_AGE_CONFIG = 
CommonClientConfigs.METADATA_MAX_AGE_CONFIG;
-    public static final String METADATA_MAX_AGE_DOC = 
CommonClientConfigs.METADATA_MAX_AGE_DOC;
+    private static final String METADATA_MAX_AGE_DOC = 
CommonClientConfigs.METADATA_MAX_AGE_DOC;
 
     public static final String RECONNECT_BACKOFF_MS_CONFIG = 
CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG;
-    public static final String RECONNECT_BACKOFF_MS_DOC = 
CommonClientConfigs.RECONNECT_BACKOFF_MS_DOC;
+    private static final String RECONNECT_BACKOFF_MS_DOC = 
CommonClientConfigs.RECONNECT_BACKOFF_MS_DOC;
 
     public static final String SEND_BUFFER_CONFIG = 
CommonClientConfigs.SEND_BUFFER_CONFIG;
-    public static final String SEND_BUFFER_DOC = 
CommonClientConfigs.SEND_BUFFER_DOC;
+    private static final String SEND_BUFFER_DOC = 
CommonClientConfigs.SEND_BUFFER_DOC;
 
     public static final String RECEIVE_BUFFER_CONFIG = 
CommonClientConfigs.RECEIVE_BUFFER_CONFIG;
-    public static final String RECEIVE_BUFFER_DOC = 
CommonClientConfigs.RECEIVE_BUFFER_DOC;
+    private static final String RECEIVE_BUFFER_DOC = 
CommonClientConfigs.RECEIVE_BUFFER_DOC;
 
     public static final String REQUEST_TIMEOUT_MS_CONFIG = 
CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG;
-    public static final String REQUEST_TIMEOUT_MS_DOC = 
CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC;
+    private static final String REQUEST_TIMEOUT_MS_DOC = 
CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC;
 
     static {
-        CONFIG = new ConfigDef().define(APPLICATION_ID_CONFIG,      // 
required with no default value
-                Type.STRING,
-                Importance.HIGH,
-                StreamsConfig.APPLICATION_ID_DOC)
-                .define(BOOTSTRAP_SERVERS_CONFIG,       // required with no 
default value
-                        Type.LIST,
-                        Importance.HIGH,
-                        CommonClientConfigs.BOOTSTRAP_SERVERS_DOC)
-                .define(CLIENT_ID_CONFIG,
-                        Type.STRING,
-                        "",
-                        Importance.HIGH,
-                        CommonClientConfigs.CLIENT_ID_DOC)
-                .define(ZOOKEEPER_CONNECT_CONFIG,
-                        Type.STRING,
-                        "",
-                        Importance.HIGH,
-                        StreamsConfig.ZOOKEEPER_CONNECT_DOC)
-                .define(STATE_DIR_CONFIG,
-                        Type.STRING,
-                        "/tmp/kafka-streams",
-                        Importance.MEDIUM,
-                        STATE_DIR_DOC)
-                .define(REPLICATION_FACTOR_CONFIG,
-                        Type.INT,
-                        1,
-                        Importance.MEDIUM,
-                        REPLICATION_FACTOR_DOC)
-                .define(TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
-                        Type.CLASS,
-                        FailOnInvalidTimestamp.class.getName(),
-                        Importance.MEDIUM,
-                        TIMESTAMP_EXTRACTOR_CLASS_DOC)
-                .define(PARTITION_GROUPER_CLASS_CONFIG,
-                        Type.CLASS,
-                        DefaultPartitionGrouper.class.getName(),
-                        Importance.MEDIUM,
-                        PARTITION_GROUPER_CLASS_DOC)
-                .define(KEY_SERDE_CLASS_CONFIG,
-                        Type.CLASS,
-                        Serdes.ByteArraySerde.class.getName(),
-                        Importance.MEDIUM,
-                        KEY_SERDE_CLASS_DOC)
-                .define(VALUE_SERDE_CLASS_CONFIG,
-                        Type.CLASS,
-                        Serdes.ByteArraySerde.class.getName(),
-                        Importance.MEDIUM,
-                        VALUE_SERDE_CLASS_DOC)
-                .define(COMMIT_INTERVAL_MS_CONFIG,
-                        Type.LONG,
-                        30000,
-                        Importance.LOW,
-                        COMMIT_INTERVAL_MS_DOC)
-                .define(POLL_MS_CONFIG,
-                        Type.LONG,
-                        100,
-                        Importance.LOW,
-                        POLL_MS_DOC)
-                .define(NUM_STREAM_THREADS_CONFIG,
-                        Type.INT,
-                        1,
-                        Importance.LOW,
-                        NUM_STREAM_THREADS_DOC)
-                .define(NUM_STANDBY_REPLICAS_CONFIG,
-                        Type.INT,
-                        0,
-                        Importance.LOW,
-                        NUM_STANDBY_REPLICAS_DOC)
-                .define(BUFFERED_RECORDS_PER_PARTITION_CONFIG,
-                        Type.INT,
-                        1000,
-                        Importance.LOW,
-                        BUFFERED_RECORDS_PER_PARTITION_DOC)
-                .define(STATE_CLEANUP_DELAY_MS_CONFIG,
-                        Type.LONG,
-                        60000,
-                        Importance.LOW,
-                        STATE_CLEANUP_DELAY_MS_DOC)
-                .define(METRIC_REPORTER_CLASSES_CONFIG,
-                        Type.LIST,
-                        "",
-                        Importance.LOW,
-                        CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
-                .define(METRICS_SAMPLE_WINDOW_MS_CONFIG,
-                        Type.LONG,
-                        30000,
-                        atLeast(0),
-                        Importance.LOW,
-                        CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_DOC)
-                .define(METRICS_NUM_SAMPLES_CONFIG,
-                        Type.INT,
-                        2,
-                        atLeast(1),
-                        Importance.LOW,
-                        CommonClientConfigs.METRICS_NUM_SAMPLES_DOC)
-                .define(METRICS_RECORDING_LEVEL_CONFIG,
-                        Type.STRING,
-                        Sensor.RecordingLevel.INFO.toString(),
-                        in(Sensor.RecordingLevel.INFO.toString(), 
Sensor.RecordingLevel.DEBUG.toString()),
-                        Importance.LOW,
-                        CommonClientConfigs.METRICS_RECORDING_LEVEL_DOC)
-                .define(APPLICATION_SERVER_CONFIG,
-                        Type.STRING,
-                        "",
-                        Importance.LOW,
-                        APPLICATION_SERVER_DOC)
-                .define(ROCKSDB_CONFIG_SETTER_CLASS_CONFIG,
-                        Type.CLASS,
-                        null,
-                        Importance.LOW,
-                        ROCKSDB_CONFIG_SETTER_CLASS_DOC)
-                .define(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG,
-                        Type.LONG,
-                        24 * 60 * 60 * 1000,
-                        Importance.MEDIUM,
-                        WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_DOC)
-                .define(CACHE_MAX_BYTES_BUFFERING_CONFIG,
-                        Type.LONG,
-                        10 * 1024 * 1024L,
-                        atLeast(0),
-                        Importance.LOW,
-                        CACHE_MAX_BYTES_BUFFERING_DOC)
-                .define(SECURITY_PROTOCOL_CONFIG,
-                        Type.STRING,
-                        DEFAULT_SECURITY_PROTOCOL,
-                        Importance.MEDIUM,
-                        SECURITY_PROTOCOL_DOC)
-                .define(CONNECTIONS_MAX_IDLE_MS_CONFIG,
-                        ConfigDef.Type.LONG,
-                        9 * 60 * 1000,
-                        ConfigDef.Importance.MEDIUM,
-                        CONNECTIONS_MAX_IDLE_MS_DOC)
-                .define(RETRY_BACKOFF_MS_CONFIG,
-                        ConfigDef.Type.LONG,
-                        100L,
-                        atLeast(0L),
-                        ConfigDef.Importance.LOW,
-                        RETRY_BACKOFF_MS_DOC)
-                .define(METADATA_MAX_AGE_CONFIG,
-                        ConfigDef.Type.LONG,
-                        5 * 60 * 1000,
-                        atLeast(0),
-                        ConfigDef.Importance.LOW,
-                        METADATA_MAX_AGE_DOC)
-                .define(RECONNECT_BACKOFF_MS_CONFIG,
-                        ConfigDef.Type.LONG,
-                        50L,
-                        atLeast(0L),
-                        ConfigDef.Importance.LOW,
-                        RECONNECT_BACKOFF_MS_DOC)
-                .define(SEND_BUFFER_CONFIG,
-                        ConfigDef.Type.INT,
-                        128 * 1024,
-                        atLeast(0),
-                        ConfigDef.Importance.MEDIUM,
-                        SEND_BUFFER_DOC)
-                .define(RECEIVE_BUFFER_CONFIG,
-                        ConfigDef.Type.INT,
-                        32 * 1024,
-                        atLeast(0),
-                        ConfigDef.Importance.MEDIUM,
-                        RECEIVE_BUFFER_DOC)
-                .define(REQUEST_TIMEOUT_MS_CONFIG,
-                        ConfigDef.Type.INT,
-                        40 * 1000,
-                        atLeast(0),
-                        ConfigDef.Importance.MEDIUM,
-                        REQUEST_TIMEOUT_MS_DOC);
+        CONFIG = new ConfigDef()
+            .define(APPLICATION_ID_CONFIG, // required with no default value
+                    Type.STRING,
+                    Importance.HIGH,
+                    APPLICATION_ID_DOC)
+            .define(BOOTSTRAP_SERVERS_CONFIG, // required with no default value
+                    Type.LIST,
+                    Importance.HIGH,
+                    CommonClientConfigs.BOOTSTRAP_SERVERS_DOC)
+            .define(CLIENT_ID_CONFIG,
+                    Type.STRING,
+                    "",
+                    Importance.HIGH,
+                    CommonClientConfigs.CLIENT_ID_DOC)
+            .define(ZOOKEEPER_CONNECT_CONFIG,
+                    Type.STRING,
+                    "",
+                    Importance.HIGH,
+                    ZOOKEEPER_CONNECT_DOC)
+            .define(STATE_DIR_CONFIG,
+                    Type.STRING,
+                    "/tmp/kafka-streams",
+                    Importance.MEDIUM,
+                    STATE_DIR_DOC)
+            .define(REPLICATION_FACTOR_CONFIG,
+                    Type.INT,
+                    1,
+                    Importance.MEDIUM,
+                    REPLICATION_FACTOR_DOC)
+            .define(TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
+                    Type.CLASS,
+                    FailOnInvalidTimestamp.class.getName(),
+                    Importance.MEDIUM,
+                    TIMESTAMP_EXTRACTOR_CLASS_DOC)
+            .define(PARTITION_GROUPER_CLASS_CONFIG,
+                    Type.CLASS,
+                    DefaultPartitionGrouper.class.getName(),
+                    Importance.MEDIUM,
+                    PARTITION_GROUPER_CLASS_DOC)
+            .define(KEY_SERDE_CLASS_CONFIG,
+                    Type.CLASS,
+                    Serdes.ByteArraySerde.class.getName(),
+                    Importance.MEDIUM,
+                    KEY_SERDE_CLASS_DOC)
+            .define(VALUE_SERDE_CLASS_CONFIG,
+                    Type.CLASS,
+                    Serdes.ByteArraySerde.class.getName(),
+                    Importance.MEDIUM,
+                    VALUE_SERDE_CLASS_DOC)
+            .define(COMMIT_INTERVAL_MS_CONFIG,
+                    Type.LONG,
+                    30000,
+                    Importance.LOW,
+                    COMMIT_INTERVAL_MS_DOC)
+            .define(POLL_MS_CONFIG,
+                    Type.LONG,
+                    100,
+                    Importance.LOW,
+                    POLL_MS_DOC)
+            .define(NUM_STREAM_THREADS_CONFIG,
+                    Type.INT,
+                    1,
+                    Importance.LOW,
+                    NUM_STREAM_THREADS_DOC)
+            .define(NUM_STANDBY_REPLICAS_CONFIG,
+                    Type.INT,
+                    0,
+                    Importance.LOW,
+                    NUM_STANDBY_REPLICAS_DOC)
+            .define(BUFFERED_RECORDS_PER_PARTITION_CONFIG,
+                    Type.INT,
+                    1000,
+                    Importance.LOW,
+                    BUFFERED_RECORDS_PER_PARTITION_DOC)
+            .define(STATE_CLEANUP_DELAY_MS_CONFIG,
+                    Type.LONG,
+                    60000,
+                    Importance.LOW,
+                    STATE_CLEANUP_DELAY_MS_DOC)
+            .define(METRIC_REPORTER_CLASSES_CONFIG,
+                    Type.LIST,
+                    "",
+                    Importance.LOW,
+                    CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
+            .define(METRICS_SAMPLE_WINDOW_MS_CONFIG,
+                    Type.LONG,
+                    30000,
+                    atLeast(0),
+                    Importance.LOW,
+                    CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_DOC)
+            .define(METRICS_NUM_SAMPLES_CONFIG,
+                    Type.INT,
+                    2,
+                    atLeast(1),
+                    Importance.LOW,
+                    CommonClientConfigs.METRICS_NUM_SAMPLES_DOC)
+            .define(METRICS_RECORDING_LEVEL_CONFIG,
+                    Type.STRING,
+                    Sensor.RecordingLevel.INFO.toString(),
+                    in(Sensor.RecordingLevel.INFO.toString(), 
Sensor.RecordingLevel.DEBUG.toString()),
+                    Importance.LOW,
+                    CommonClientConfigs.METRICS_RECORDING_LEVEL_DOC)
+            .define(APPLICATION_SERVER_CONFIG,
+                    Type.STRING,
+                    "",
+                    Importance.LOW,
+                    APPLICATION_SERVER_DOC)
+            .define(ROCKSDB_CONFIG_SETTER_CLASS_CONFIG,
+                    Type.CLASS,
+                    null,
+                    Importance.LOW,
+                    ROCKSDB_CONFIG_SETTER_CLASS_DOC)
+            .define(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG,
+                    Type.LONG,
+                    24 * 60 * 60 * 1000,
+                    Importance.MEDIUM,
+                    WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_DOC)
+            .define(CACHE_MAX_BYTES_BUFFERING_CONFIG,
+                    Type.LONG,
+                    10 * 1024 * 1024L,
+                    atLeast(0),
+                    Importance.LOW,
+                    CACHE_MAX_BYTES_BUFFERING_DOC)
+            .define(SECURITY_PROTOCOL_CONFIG,
+                    Type.STRING,
+                    DEFAULT_SECURITY_PROTOCOL,
+                    Importance.MEDIUM,
+                    SECURITY_PROTOCOL_DOC)
+            .define(CONNECTIONS_MAX_IDLE_MS_CONFIG,
+                    ConfigDef.Type.LONG,
+                    9 * 60 * 1000,
+                    ConfigDef.Importance.MEDIUM,
+                    CONNECTIONS_MAX_IDLE_MS_DOC)
+            .define(RETRY_BACKOFF_MS_CONFIG,
+                    ConfigDef.Type.LONG,
+                    100L,
+                    atLeast(0L),
+                    ConfigDef.Importance.LOW,
+                    RETRY_BACKOFF_MS_DOC)
+            .define(METADATA_MAX_AGE_CONFIG,
+                    ConfigDef.Type.LONG,
+                    5 * 60 * 1000,
+                    atLeast(0),
+                    ConfigDef.Importance.LOW,
+                    METADATA_MAX_AGE_DOC)
+            .define(RECONNECT_BACKOFF_MS_CONFIG,
+                    ConfigDef.Type.LONG,
+                    50L,
+                    atLeast(0L),
+                    ConfigDef.Importance.LOW,
+                    RECONNECT_BACKOFF_MS_DOC)
+            .define(SEND_BUFFER_CONFIG,
+                    ConfigDef.Type.INT,
+                    128 * 1024,
+                    atLeast(0),
+                    ConfigDef.Importance.MEDIUM,
+                    SEND_BUFFER_DOC)
+            .define(RECEIVE_BUFFER_CONFIG,
+                    ConfigDef.Type.INT,
+                    32 * 1024,
+                    atLeast(0),
+                    ConfigDef.Importance.MEDIUM,
+                    RECEIVE_BUFFER_DOC)
+            .define(REQUEST_TIMEOUT_MS_CONFIG,
+                    ConfigDef.Type.INT,
+                    40 * 1000,
+                    atLeast(0),
+                    ConfigDef.Importance.MEDIUM,
+                    REQUEST_TIMEOUT_MS_DOC);
     }
 
     // this is the list of configs for underlying clients
@@ -347,7 +391,7 @@ public class StreamsConfig extends AbstractConfig {
     private static final Map<String, Object> PRODUCER_DEFAULT_OVERRIDES;
     static
     {
-        Map<String, Object> tempProducerDefaultOverrides = new HashMap<>();
+        final Map<String, Object> tempProducerDefaultOverrides = new 
HashMap<>();
         tempProducerDefaultOverrides.put(ProducerConfig.LINGER_MS_CONFIG, 
"100");
 
         PRODUCER_DEFAULT_OVERRIDES = 
Collections.unmodifiableMap(tempProducerDefaultOverrides);
@@ -356,7 +400,7 @@ public class StreamsConfig extends AbstractConfig {
     private static final Map<String, Object> CONSUMER_DEFAULT_OVERRIDES;
     static
     {
-        Map<String, Object> tempConsumerDefaultOverrides = new HashMap<>();
+        final Map<String, Object> tempConsumerDefaultOverrides = new 
HashMap<>();
         
tempConsumerDefaultOverrides.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 
"1000");
         
tempConsumerDefaultOverrides.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
         
tempConsumerDefaultOverrides.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
"false");
@@ -369,160 +413,177 @@ public class StreamsConfig extends AbstractConfig {
     }
 
     /**
-     * Prefix a property with {@link StreamsConfig#CONSUMER_PREFIX}. This is 
used to isolate consumer configs
-     * from producer configs
-     * @param consumerProp
-     * @return CONSUMER_PREFIX + consumerProp
+     * Prefix a property with {@link #CONSUMER_PREFIX}. This is used to 
isolate {@link ConsumerConfig consumer configs}
+     * from {@link ProducerConfig producer configs}.
+     *
+     * @param consumerProp the consumer property to be masked
+     * @return {@link #CONSUMER_PREFIX} + {@code consumerProp}
      */
     public static String consumerPrefix(final String consumerProp) {
         return CONSUMER_PREFIX + consumerProp;
     }
 
     /**
-     * Prefix a property with {@link StreamsConfig#PRODUCER_PREFIX}. This is 
used to isolate producer configs
-     * from consumer configs
-     * @param producerProp
-     * @return PRODUCER_PREFIX + consumerProp
+     * Prefix a property with {@link #PRODUCER_PREFIX}. This is used to 
isolate {@link ProducerConfig producer configs}
+     * from {@link ConsumerConfig consumer configs}.
+     *
+     * @param producerProp the producer property to be masked
+     * @return PRODUCER_PREFIX + {@code producerProp}
      */
     public static String producerPrefix(final String producerProp) {
         return PRODUCER_PREFIX + producerProp;
     }
 
     /**
-     * Returns a copy of the config definition.
+     * Return a copy of the config definition.
+     *
+     * @return a copy of the config definition
      */
     public static ConfigDef configDef() {
         return new ConfigDef(CONFIG);
     }
 
-    public StreamsConfig(Map<?, ?> props) {
-        super(CONFIG, props);
-    }
-
     /**
-     * Get the configs specific to the Consumer. Properties using the prefix 
{@link StreamsConfig#CONSUMER_PREFIX}
-     * will be used in favor over their non-prefixed versions except in the 
case of {@link ConsumerConfig#BOOTSTRAP_SERVERS_CONFIG}
-     * where we always use the non-prefixed version as we only support 
reading/writing from/to the same Kafka Cluster
-     * @param streamThread   the {@link StreamThread} creating a consumer
-     * @param groupId        consumer groupId
-     * @param clientId       clientId
-     * @return  Map of the Consumer configuration.
-     * @throws ConfigException
+     * Create a new {@code StreamsConfig} using the given properties.
+     *
+     * @param props properties that specify Kafka Streams and internal 
consumer/producer configuration
      */
-    public Map<String, Object> getConsumerConfigs(StreamThread streamThread, 
String groupId, String clientId) throws ConfigException {
-
-        final Map<String, Object> consumerProps = new 
HashMap<>(CONSUMER_DEFAULT_OVERRIDES);
+    public StreamsConfig(final Map<?, ?> props) {
+        super(CONFIG, props);
+    }
 
+    private Map<String, Object> getCommonConsumerConfigs() throws 
ConfigException {
         final Map<String, Object> clientProvidedProps = 
getClientPropsWithPrefix(CONSUMER_PREFIX, ConsumerConfig.configNames());
 
         // disable auto commit and throw exception if there is user overridden 
values,
         // this is necessary for streams commit semantics
         if 
(clientProvidedProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
             throw new ConfigException("Unexpected user-specified consumer 
config " + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
-                    + ", as the streams client will always turn off auto 
committing.");
+                + ", as the streams client will always turn off auto 
committing.");
         }
 
+        final Map<String, Object> consumerProps = new 
HashMap<>(CONSUMER_DEFAULT_OVERRIDES);
         consumerProps.putAll(clientProvidedProps);
 
         // bootstrap.servers should be from StreamsConfig
-        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
this.originals().get(BOOTSTRAP_SERVERS_CONFIG));
+        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
originals().get(BOOTSTRAP_SERVERS_CONFIG));
+        // remove deprecate ZK config
+        consumerProps.remove(ZOOKEEPER_CONNECT_CONFIG);
+
+        return consumerProps;
+    }
+
+    /**
+     * Get the configs to the {@link KafkaConsumer consumer}.
+     * Properties using the prefix {@link #CONSUMER_PREFIX} will be used in 
favor over their non-prefixed versions
+     * except in the case of {@link ConsumerConfig#BOOTSTRAP_SERVERS_CONFIG} 
where we always use the non-prefixed
+     * version as we only support reading/writing from/to the same Kafka 
Cluster.
+     *
+     * @param streamThread the {@link StreamThread} creating a consumer
+     * @param groupId      consumer groupId
+     * @param clientId     clientId
+     * @return Map of the consumer configuration.
+     * @throws ConfigException if {@code "enable.auto.commit"} was set to 
{@code false} by the user
+     */
+    public Map<String, Object> getConsumerConfigs(final StreamThread 
streamThread,
+                                                  final String groupId,
+                                                  final String clientId) 
throws ConfigException {
+        final Map<String, Object> consumerProps = getCommonConsumerConfigs();
+
         // add client id with stream client id prefix, and group id
         consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
         consumerProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + 
"-consumer");
 
         // add configs required for stream partition assignor
-        consumerProps.put(StreamsConfig.InternalConfig.STREAM_THREAD_INSTANCE, 
streamThread);
-        consumerProps.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 
getInt(REPLICATION_FACTOR_CONFIG));
-        consumerProps.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 
getInt(NUM_STANDBY_REPLICAS_CONFIG));
+        consumerProps.put(InternalConfig.STREAM_THREAD_INSTANCE, streamThread);
+        consumerProps.put(REPLICATION_FACTOR_CONFIG, 
getInt(REPLICATION_FACTOR_CONFIG));
+        consumerProps.put(NUM_STANDBY_REPLICAS_CONFIG, 
getInt(NUM_STANDBY_REPLICAS_CONFIG));
         consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
StreamPartitionAssignor.class.getName());
-        
consumerProps.put(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG,
 getLong(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG));
-        if (!getString(ZOOKEEPER_CONNECT_CONFIG).equals("")) {
-            consumerProps.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, 
getString(ZOOKEEPER_CONNECT_CONFIG));
-        }
+        
consumerProps.put(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, 
getLong(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG));
 
         consumerProps.put(APPLICATION_SERVER_CONFIG, 
getString(APPLICATION_SERVER_CONFIG));
+
         return consumerProps;
     }
 
-
     /**
-     * Get the consumer config for the restore-consumer. Properties using the 
prefix {@link StreamsConfig#CONSUMER_PREFIX}
-     * will be used in favor over their non-prefixed versions except in the 
case of {@link ConsumerConfig#BOOTSTRAP_SERVERS_CONFIG}
-     * where we always use the non-prefixed version as we only support 
reading/writing from/to the same Kafka Cluster
-     * @param clientId  clientId
-     * @return  Map of the Consumer configuration
-     * @throws ConfigException
+     * Get the configs for the {@link KafkaConsumer restore-consumer}.
+     * Properties using the prefix {@link #CONSUMER_PREFIX} will be used in 
favor over their non-prefixed versions
+     * except in the case of {@link ConsumerConfig#BOOTSTRAP_SERVERS_CONFIG} 
where we always use the non-prefixed
+     * version as we only support reading/writing from/to the same Kafka 
Cluster.
+     *
+     * @param clientId clientId
+     * @return Map of the consumer configuration.
+     * @throws ConfigException if {@code "enable.auto.commit"} was set to 
{@code false} by the user
      */
-    public Map<String, Object> getRestoreConsumerConfigs(String clientId) 
throws ConfigException {
-        Map<String, Object> consumerProps = new 
HashMap<>(CONSUMER_DEFAULT_OVERRIDES);
-
-        final Map<String, Object> clientProvidedProps = 
getClientPropsWithPrefix(CONSUMER_PREFIX, ConsumerConfig.configNames());
-
-        // disable auto commit and throw exception if there is user overridden 
values,
-        // this is necessary for streams commit semantics
-        if 
(clientProvidedProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
-            throw new ConfigException("Unexpected user-specified consumer 
config " + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
-                    + ", as the streams client will always turn off auto 
committing.");
-        }
-
-        consumerProps.putAll(clientProvidedProps);
-
-        // bootstrap.servers should be from StreamsConfig
-        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
this.originals().get(BOOTSTRAP_SERVERS_CONFIG));
+    public Map<String, Object> getRestoreConsumerConfigs(final String 
clientId) throws ConfigException {
+        final Map<String, Object> consumerProps = getCommonConsumerConfigs();
 
         // no need to set group id for a restore consumer
         consumerProps.remove(ConsumerConfig.GROUP_ID_CONFIG);
-
         // add client id with stream client id prefix
         consumerProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + 
"-restore-consumer");
 
         return consumerProps;
     }
 
-
     /**
-     * Get the configs for the Producer. Properties using the prefix {@link 
StreamsConfig#PRODUCER_PREFIX}
-     * will be used in favor over their non-prefixed versions except in the 
case of {@link ProducerConfig#BOOTSTRAP_SERVERS_CONFIG}
-     * where we always use the non-prefixed version as we only support 
reading/writing from/to the same Kafka Cluster
-     * @param clientId  clientId
-     * @return  Map of the Consumer configuration
-     * @throws ConfigException
+     * Get the configs for the {@link KafkaProducer producer}.
+     * Properties using the prefix {@link #PRODUCER_PREFIX} will be used in 
favor over their non-prefixed versions
+     * except in the case of {@link ProducerConfig#BOOTSTRAP_SERVERS_CONFIG} 
where we always use the non-prefixed
+     * version as we only support reading/writing from/to the same Kafka 
Cluster.
+     *
+     * @param clientId clientId
+     * @return Map of the producer configuration.
      */
-    public Map<String, Object> getProducerConfigs(String clientId) {
+    public Map<String, Object> getProducerConfigs(final String clientId) {
         // generate producer configs from original properties and overridden 
maps
         final Map<String, Object> props = new 
HashMap<>(PRODUCER_DEFAULT_OVERRIDES);
         props.putAll(getClientPropsWithPrefix(PRODUCER_PREFIX, 
ProducerConfig.configNames()));
 
-        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
this.originals().get(BOOTSTRAP_SERVERS_CONFIG));
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
originals().get(BOOTSTRAP_SERVERS_CONFIG));
         // add client id with stream client id prefix
         props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + 
"-producer");
 
         return props;
     }
 
-    private Map<String, Object> getClientPropsWithPrefix(final String prefix, 
final Set<String> configNames) {
+    private Map<String, Object> getClientPropsWithPrefix(final String prefix,
+                                                         final Set<String> 
configNames) {
         final Map<String, Object> props = clientProps(configNames, 
originals());
-        props.putAll(this.originalsWithPrefix(prefix));
+        props.putAll(originalsWithPrefix(prefix));
         return props;
     }
 
+    /**
+     * Return an {@link Serde#configure(Map, boolean) configured} instance of 
{@link #KEY_SERDE_CLASS_CONFIG key Serde
+     * class}.
+     *
+     * @return an configured instance of key Serde class
+     */
     public Serde keySerde() {
         try {
-            Serde<?> serde = 
getConfiguredInstance(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serde.class);
+            final Serde<?> serde = 
getConfiguredInstance(KEY_SERDE_CLASS_CONFIG, Serde.class);
             serde.configure(originals(), true);
             return serde;
-        } catch (Exception e) {
-            throw new StreamsException(String.format("Failed to configure key 
serde %s", get(StreamsConfig.KEY_SERDE_CLASS_CONFIG)), e);
+        } catch (final Exception e) {
+            throw new StreamsException(String.format("Failed to configure key 
serde %s", get(KEY_SERDE_CLASS_CONFIG)), e);
         }
     }
 
+    /**
+     * Return an {@link Serde#configure(Map, boolean) configured} instance of 
{@link #VALUE_SERDE_CLASS_CONFIG value
+     * Serde class}.
+     *
+     * @return an configured instance of value Serde class
+     */
     public Serde valueSerde() {
         try {
-            Serde<?> serde = 
getConfiguredInstance(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serde.class);
+            final Serde<?> serde = 
getConfiguredInstance(VALUE_SERDE_CLASS_CONFIG, Serde.class);
             serde.configure(originals(), false);
             return serde;
-        } catch (Exception e) {
-            throw new StreamsException(String.format("Failed to configure 
value serde %s", get(StreamsConfig.VALUE_SERDE_CLASS_CONFIG)), e);
+        } catch (final Exception e) {
+            throw new StreamsException(String.format("Failed to configure 
value serde %s", get(VALUE_SERDE_CLASS_CONFIG)), e);
         }
     }
 
@@ -530,14 +591,15 @@ public class StreamsConfig extends AbstractConfig {
      * Override any client properties in the original configs with overrides
      *
      * @param configNames The given set of configuration names.
-     * @param originals The original configs to be filtered.
+     * @param originals   The original configs to be filtered.
      * @return client config with any overrides
      */
-    private Map<String, Object> clientProps(Set<String> configNames, 
Map<String, Object> originals) {
+    private Map<String, Object> clientProps(final Set<String> configNames,
+                                            final Map<String, Object> 
originals) {
         // iterate all client config names, filter out non-client configs from 
the original
         // property map and use the overridden values when they are not 
specified by users
-        Map<String, Object> parsed = new HashMap<>();
-        for (String configName: configNames) {
+        final Map<String, Object> parsed = new HashMap<>();
+        for (final String configName: configNames) {
             if (originals.containsKey(configName)) {
                 parsed.put(configName, originals.get(configName));
             }
@@ -546,7 +608,7 @@ public class StreamsConfig extends AbstractConfig {
         return parsed;
     }
 
-    public static void main(String[] args) {
+    public static void main(final String[] args) {
         System.out.println(CONFIG.toHtmlTable());
     }
 }

Reply via email to