Repository: kafka
Updated Branches:
  refs/heads/trunk 3e69ce801 -> d2774e302


KAFKA-5727: Add Streams quickstart tutorial as an archetype project

0. Minor fixes on the existing examples to merge all on a single input topic; 
also do not use `common.utils.Exit` as it is for internal usage only.

1. Add the archetype project for the quickstart. Steps to try it out:

  a. `mvn install` on the quickstart directory.
  b. `mvn archetype:generate \
-DarchetypeGroupId=org.apache.kafka \
-DarchetypeArtifactId=streams-quickstart-java \
-DarchetypeVersion=1.0.0-SNAPSHOT \
-DgroupId=streams-quickstart \
-DartifactId=streams-quickstart \
-Dversion=0.1 \
-Dpackage=StreamsQuickstart \
-DinteractiveMode=false` at any directory to create the project.
  c. build the streams jar with version `1.0.0-SNAPSHOT` to local maven 
repository with `./gradlew installAll`; `cd streams-quickstart; mvn clean 
package`
  d. create the input / output topics, start the console producer and consumer.
  e. start the program: `mvn exec:java 
-Dexec.mainClass=StreamsQuickstart.Pipe/LineSplit/WordCount`.
  f. type data on console producer and observe data on console consumer.

Author: Guozhang Wang <wangg...@gmail.com>

Reviewers: Damian Guy <damian....@gmail.com>, Bill Bejeck <bbej...@gmail.com>, 
Ewen Cheslack-Postava <m...@ewencp.org>, Eno Thereska <eno.there...@gmail.com>

Closes #3630 from guozhangwang/KMinor-streams-quickstart-tutorial


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d2774e30
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d2774e30
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d2774e30

Branch: refs/heads/trunk
Commit: d2774e302fa528b4fe7b7db39f69cf679753f348
Parents: 3e69ce8
Author: Guozhang Wang <wangg...@gmail.com>
Authored: Fri Aug 11 12:19:28 2017 -0700
Committer: Guozhang Wang <wangg...@gmail.com>
Committed: Fri Aug 11 12:19:28 2017 -0700

----------------------------------------------------------------------
 README.md                                       |  30 +
 build.gradle                                    |   3 +-
 docs/documentation/streams/tutorial.html        |  19 +
 docs/js/templateData.js                         |   6 +-
 docs/streams/quickstart.html                    |  28 +-
 docs/streams/tutorial.html                      | 630 +++++++++++++++++++
 docs/toc.html                                   |  11 +-
 .../kafka/streams/examples/pipe/PipeDemo.java   |   9 +-
 .../examples/temperature/TemperatureDemo.java   |   5 +-
 .../examples/wordcount/WordCountDemo.java       |  18 +-
 .../wordcount/WordCountProcessorDemo.java       |   9 +-
 streams/quickstart/java/pom.xml                 |  36 ++
 .../META-INF/maven/archetype-metadata.xml       |  34 +
 .../main/resources/archetype-resources/pom.xml  | 136 ++++
 .../src/main/java/LineSplit.java                |  86 +++
 .../archetype-resources/src/main/java/Pipe.java |  67 ++
 .../src/main/java/WordCount.java                |  97 +++
 .../src/main/resources/log4j.properties         |  19 +
 .../projects/basic/archetype.properties         |  18 +
 .../src/test/resources/projects/basic/goal.txt  |   1 +
 streams/quickstart/pom.xml                      | 101 +++
 21 files changed, 1317 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d2774e30/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index 5b4e654..5ed30b2 100644
--- a/README.md
+++ b/README.md
@@ -127,6 +127,36 @@ Please note for this to work you should create/update 
`${GRADLE_USER_HOME}/gradl
     signing.password=
     signing.secretKeyRingFile=
 
+### Publishing the streams quickstart archetype artifact to maven ###
+For the Streams archetype project, one cannot use gradle to upload to maven; 
instead the `mvn deploy` command needs to be called at the quickstart folder:
+
+    cd streams/quickstart
+    mvn deploy
+
+Please note for this to work you should create/update user maven settings 
(typically, `${USER_HOME}/.m2/settings.xml`) to assign the following variables
+
+    <settings xmlns="http://maven.apache.org/SETTINGS/1.0.0";
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+       xsi:schemaLocation="http://maven.apache.org/SETTINGS/1.0.0
+                           https://maven.apache.org/xsd/settings-1.0.0.xsd";>
+    ...                           
+    <servers>
+       ...
+       <server>
+          <id>apache.snapshots.https</id>
+          <username>${maven_username}</username>
+          <password>${maven_password}</password>
+       </server>
+       <server>
+          <id>apache.releases.https</id>
+          <username>${maven_username}</username>
+          <password>${maven_password}</password>
+        </server>
+        ...
+     </servers>
+     ...
+
+
 ### Installing the jars to the local Maven repository ###
     ./gradlew installAll
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2774e30/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 0e36a85..48f3f2f 100644
--- a/build.gradle
+++ b/build.gradle
@@ -121,7 +121,8 @@ if (new File('.git').exists()) {
         '**/README.md',
         '**/id_rsa',
         '**/id_rsa.pub',
-        'checkstyle/suppressions.xml'
+        'checkstyle/suppressions.xml',
+        'streams/quickstart/java/src/test/resources/projects/basic/goal.txt'
     ])
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2774e30/docs/documentation/streams/tutorial.html
----------------------------------------------------------------------
diff --git a/docs/documentation/streams/tutorial.html 
b/docs/documentation/streams/tutorial.html
new file mode 100644
index 0000000..90f408d
--- /dev/null
+++ b/docs/documentation/streams/tutorial.html
@@ -0,0 +1,19 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!-- should always link the the latest release's documentation -->
+<!--#include virtual="../../streams/tutorial.html" -->

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2774e30/docs/js/templateData.js
----------------------------------------------------------------------
diff --git a/docs/js/templateData.js b/docs/js/templateData.js
index 50997bd..d727512 100644
--- a/docs/js/templateData.js
+++ b/docs/js/templateData.js
@@ -17,8 +17,8 @@ limitations under the License.
 
 // Define variables for doc templates
 var context={
-    "version": "0110",
-    "dotVersion": "0.11.0",
-    "fullDotVersion": "0.11.0.0"
+    "version": "100",
+    "dotVersion": "1.0",
+    "fullDotVersion": "1.0.0"
     "scalaVersion:" "2.11"
 };

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2774e30/docs/streams/quickstart.html
----------------------------------------------------------------------
diff --git a/docs/streams/quickstart.html b/docs/streams/quickstart.html
index 031a375..977fa5f 100644
--- a/docs/streams/quickstart.html
+++ b/docs/streams/quickstart.html
@@ -40,10 +40,10 @@ of the <code><a 
href="https://github.com/apache/kafka/blob/{{dotVersion}}/stream
 final Serde&lt;String&gt; stringSerde = Serdes.String();
 final Serde&lt;Long&gt; longSerde = Serdes.Long();
 
-// Construct a `KStream` from the input topic "streams-wordcount-input", where 
message values
+// Construct a `KStream` from the input topic "streams-plaintext-input", where 
message values
 // represent lines of text (for the sake of this example, we ignore whatever 
may be stored
 // in the message keys).
-KStream&lt;String, String&gt; textLines = builder.stream(stringSerde, 
stringSerde, "streams-wordcount-input");
+KStream&lt;String, String&gt; textLines = builder.stream(stringSerde, 
stringSerde, "streams-plaintext-input");
 
 KTable&lt;String, Long&gt; wordCounts = textLines
     // Split each text line, by whitespace, into words.
@@ -120,15 +120,15 @@ Or on Windows:
 
 -->
 
-Next, we create the input topic named <b>streams-wordcount-input</b> and the 
output topic named <b>streams-wordcount-output</b>:
+Next, we create the input topic named <b>streams-plaintext-input</b> and the 
output topic named <b>streams-wordcount-output</b>:
 
 <pre class="brush: bash;">
 &gt; bin/kafka-topics.sh --create \
     --zookeeper localhost:2181 \
     --replication-factor 1 \
     --partitions 1 \
-    --topic streams-wordcount-input
-Created topic "streams-wordcount-input".
+    --topic streams-plaintext-input
+Created topic "streams-plaintext-input".
 
 &gt; bin/kafka-topics.sh --create \
     --zookeeper localhost:2181 \
@@ -143,8 +143,8 @@ The created topic can be described with the same 
<b>kafka-topics</b> tool:
 <pre class="brush: bash;">
 &gt; bin/kafka-topics.sh --zookeeper localhost:2181 --describe
 
-Topic:streams-wordcount-input  PartitionCount:1        ReplicationFactor:1     
Configs:
-    Topic: streams-wordcount-input     Partition: 0    Leader: 0       
Replicas: 0     Isr: 0
+Topic:streams-plaintext-input  PartitionCount:1        ReplicationFactor:1     
Configs:
+    Topic: streams-plaintext-input     Partition: 0    Leader: 0       
Replicas: 0     Isr: 0
 Topic:streams-wordcount-output PartitionCount:1        ReplicationFactor:1     
Configs:
        Topic: streams-wordcount-output Partition: 0    Leader: 0       
Replicas: 0     Isr: 0
 </pre>
@@ -158,7 +158,7 @@ The following command starts the WordCount demo application:
 </pre>
 
 <p>
-The demo application will read from the input topic 
<b>streams-wordcount-input</b>, perform the computations of the WordCount 
algorithm on each of the read messages,
+The demo application will read from the input topic 
<b>streams-plaintext-input</b>, perform the computations of the WordCount 
algorithm on each of the read messages,
 and continuously write its current results to the output topic 
<b>streams-wordcount-output</b>.
 Hence there won't be any STDOUT output except log entries as the results are 
written back into in Kafka.
 </p>
@@ -166,7 +166,7 @@ Hence there won't be any STDOUT output except log entries 
as the results are wri
 Now we can start the console producer in a separate terminal to write some 
input data to this topic:
 
 <pre class="brush: bash;">
-&gt; bin/kafka-console-producer.sh --broker-list localhost:9092 --topic 
streams-wordcount-input
+&gt; bin/kafka-console-producer.sh --broker-list localhost:9092 --topic 
streams-plaintext-input
 </pre>
 
 and inspect the output of the WordCount demo application by reading from its 
output topic with the console consumer in a separate terminal:
@@ -185,12 +185,12 @@ and inspect the output of the WordCount demo application 
by reading from its out
 
 <h4><a id="quickstart_streams_process" href="#quickstart_streams_process">Step 
5: Process some data</a></h4>
 
-Now let's write some message with the console producer into the input topic 
<b>streams-wordcount-input</b> by entering a single line of text and then hit 
&lt;RETURN&gt;.
+Now let's write some message with the console producer into the input topic 
<b>streams-plaintext-input</b> by entering a single line of text and then hit 
&lt;RETURN&gt;.
 This will send a new message to the input topic, where the message key is null 
and the message value is the string encoded text line that you just entered
 (in practice, input data for applications will typically be streaming 
continuously into Kafka, rather than being manually entered as we do in this 
quickstart):
 
 <pre class="brush: bash;">
-&gt; bin/kafka-console-producer.sh --broker-list localhost:9092 --topic 
streams-wordcount-input
+&gt; bin/kafka-console-producer.sh --broker-list localhost:9092 --topic 
streams-plaintext-input
 all streams lead to kafka
 </pre>
 
@@ -219,12 +219,12 @@ kafka     1
 Here, the first column is the Kafka message key in 
<code>java.lang.String</code> format and represents a word that is being 
counted, and the second column is the message value in 
<code>java.lang.Long</code>format, representing the word's latest count.
 </p>
 
-Now let's continue writing one more message with the console producer into the 
input topic <b>streams-wordcount-input</b>.
+Now let's continue writing one more message with the console producer into the 
input topic <b>streams-plaintext-input</b>.
 Enter the text line "hello kafka streams" and hit &lt;RETURN&gt;.
 Your terminal should look as follows:
 
 <pre class="brush: bash;">
-&gt; bin/kafka-console-producer.sh --broker-list localhost:9092 --topic 
streams-wordcount-input
+&gt; bin/kafka-console-producer.sh --broker-list localhost:9092 --topic 
streams-plaintext-input
 all streams lead to kafka
 hello kafka streams
 </pre>
@@ -321,7 +321,7 @@ Looking beyond the scope of this concrete example, what 
Kafka Streams is doing h
 
  <div class="pagination">
         <a href="/{{version}}/documentation/streams" class="pagination__btn 
pagination__btn__prev">Previous</a>
-        <a href="/{{version}}/documentation/streams/developer-guide" 
class="pagination__btn pagination__btn__next">Next</a>
+        <a href="/{{version}}/documentation/streams/tutorial" 
class="pagination__btn pagination__btn__next">Next</a>
     </div>
 </script>
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2774e30/docs/streams/tutorial.html
----------------------------------------------------------------------
diff --git a/docs/streams/tutorial.html b/docs/streams/tutorial.html
new file mode 100644
index 0000000..a1f4880
--- /dev/null
+++ b/docs/streams/tutorial.html
@@ -0,0 +1,630 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<script><!--#include virtual="../js/templateData.js" --></script>
+
+<script id="content-template" type="text/x-handlebars-template">
+    <h1>Write your own Streams Applications</h1>
+
+    <p>
+        In this guide we will start from scratch on setting up your own 
project to write a stream processing application using Kafka Streams.
+        It is highly recommended to read the <a 
href="/{{version}}/documentation/streams/quickstart">quickstart</a> first on 
how to run a Streams application written in Kafka Streams if you have not done 
so.
+    </p>
+
+    <h4><a id="tutorial_maven_setup" href="#tutorial_maven_setup">Setting up a 
Maven Project</a></h4>
+
+    <p>
+        We are going to use a Kafka Streams Maven Archetype for creating a 
Streams project structure with the following commands:
+    </p>
+
+    <pre class="brush: bash;">
+        mvn archetype:generate \
+            -DarchetypeGroupId=org.apache.kafka \
+            -DarchetypeArtifactId=streams-quickstart-java \
+            -DarchetypeVersion={{fullDotVersion}} \
+            -DgroupId=streams.examples \
+            -DartifactId=streams.examples \
+            -Dversion=0.1 \
+            -Dpackage=myapps
+    </pre>
+
+    <p>
+        You can use a different value for <code>groupId</code>, 
<code>artifactId</code> and <code>package</code> parameters if you like.
+        Assuming the above parameter values are used, this command will create 
a project structure that looks like this:
+    </p>
+
+    <pre class="brush: bash;">
+        &gt; tree streams.examples
+        streams-quickstart
+        ├── pom.xml
+        └── src
+            └── main
+                ├── java
+                │   └── myapps
+                │       ├── LineSplit.java
+                │       ├── Pipe.java
+                │       └── WordCount.java
+                └── resources
+                    └── log4j.properties
+    </pre>
+
+    <p>
+        The <code>pom.xml</code> file included in the project already has the 
Streams dependency defined,
+        and there are already several example programs written with Streams 
library under <code>src/main/java</code>.
+        Since we are going to start writing such programs from scratch, we can 
now delete these examples:
+    </p>
+
+    <pre class="brush: bash;">
+        &gt; cd streams-quickstart
+        &gt; rm src/main/java/myapps/*.java
+    </pre>
+
+    <h4><a id="tutorial_code_pipe" href="#tutorial_code_pipe">Writing a first 
Streams application: Pipe</a></h4>
+
+    It's coding time now! Feel free to open your favorite IDE and import this 
Maven project, or simply open a text editor and create a java file under 
<code>src/main/java</code>.
+    Let's name it <code>Pipe.java</code>:
+
+    <pre class="brush: java;">
+        package myapps;
+
+        public class Pipe {
+
+            public static void main(String[] args) throws Exception {
+
+            }
+        }
+    </pre>
+
+    <p>
+        We are going to fill in the <code>main</code> function to write this 
pipe program. Note that we will not list the import statements as we go since 
IDEs can usually add them automatically.
+        However if you are using a text editor you need to manually add the 
imports, and at the end of this section we'll show the complete code snippet 
with import statement for you.
+    </p>
+
+    <p>
+        The first step to write a Streams application is to create a 
<code>java.util.Properties</code> map to specify different Streams execution 
configuration values as defined in <code>StreamsConfig</code>.
+        A couple of important configuration values you need to set are: 
<code>StreamsConfig.BOOTSTRAP_SERVERS_CONFIG</code>, which specifies a list of 
host/port pairs to use for establishing the initial connection to the Kafka 
cluster,
+        and <code>StreamsConfig.APPLICATION_ID_CONFIG</code>, which gives the 
unique identifier of your Streams application to distinguish itself with other 
applications talking to the same Kafka cluster:
+    </p>
+
+    <pre class="brush: java;">
+        Properties props = new Properties();
+        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
+        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");   
 // assuming that the Kafka broker this application is talking to runs on local 
machine with port 9092
+    </pre>
+
+    <p>
+        In addition, you can customize other configurations in the same map, 
for example, default serialization and deserialization libraries for the record 
key-value pairs:
+    </p>
+
+    <pre class="brush: java;">
+        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
+        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
+    </pre>
+
+    <p>
+        For a full list of configurations of Kafka Streams please refer to 
this <a href="/{{version}}/documentation/#streamsconfigs">table</a>.
+    </p>
+
+    <p>
+        Next we will define the computational logic of our Streams application.
+        In Kafka Streams this computational logic is defined as a 
<code>topology</code> of connected processor nodes.
+        We can use a topology builder to construct such a topology,
+    </p>
+
+    <pre class="brush: java;">
+        final StreamsBuilder builder = new StreamsBuilder();
+    </pre>
+
+    <p>
+        And then create a source stream from a Kafka topic named 
<code>streams-plaintext-input</code> using this topology builder:
+    </p>
+
+    <pre class="brush: java;">
+        KStream&lt;String, String&gt; source = 
builder.stream("streams-plaintext-input");
+    </pre>
+
+    <p>
+        Now we get a <code>KStream</code> that is continuously generating 
records from its source Kafka topic <code>streams-plaintext-input</code>.
+        The records are organized as <code>String</code> typed key-value pairs.
+        The simplest thing we can do with this stream is to write it into 
another Kafka topic, say it's named <code>streams-pipe-output</code>:
+    </p>
+
+    <pre class="brush: java;">
+        source.to("streams-pipe-output");
+    </pre>
+
+    <p>
+        Note that we can also concatenate the above two lines into a single 
line as:
+    </p>
+
+    <pre class="brush: java;">
+        builder.stream("streams-plaintext-input").to("streams-pipe-output");
+    </pre>
+
+    <p>
+        We can inspect what kind of <code>topology</code> is created from this 
builder by doing the following:
+    </p>
+
+    <pre class="brush: java;">
+        final Topology topology = builder.build();
+    </pre>
+
+    <p>
+        And print its description to standard output as:
+    </p>
+
+    <pre class="brush: java;">
+        System.out.println(topology.describe());
+    </pre>
+
+    <p>
+        If we just stop here, compile and run the program, it will output the 
following information:
+    </p>
+
+    <pre class="brush: bash;">
+        &gt; mvn clean package
+        &gt; mvn exec:java -Dexec.mainClass=myapps.Pipe
+        Sub-topologies:
+          Sub-topology: 0
+            Source: KSTREAM-SOURCE-0000000000(topics: streams-plaintext-input) 
--> KSTREAM-SINK-0000000001
+            Sink: KSTREAM-SINK-0000000001(topic: streams-pipe-output) <-- 
KSTREAM-SOURCE-0000000000
+        Global Stores:
+          none
+    </pre>
+
+    <p>
+        As shown above, it illustrate that the constructed topology has two 
processor nodes, a source node <code>KSTREAM-SOURCE-0000000000</code> and a 
sink node <code>KSTREAM-SINK-0000000001</code>.
+        <code>KSTREAM-SOURCE-0000000000</code> continuously read records from 
Kafka topic <code>streams-plaintext-input</code> and pipe them to its 
downstream node <code>KSTREAM-SINK-0000000001</code>;
+        <code>KSTREAM-SINK-0000000001</code> will write each of its received 
record in order to another Kafka topic <code>streams-pipe-output</code>
+        (the <code>--&gt;</code> and <code>&lt;--</code> arrows dictates the 
downstream and upstream processor nodes of this node, i.e. "children" and 
"parents" within the topology graph).
+        It also illustrates that this simple topology has no global state 
stores associated with it (we will talk about state stores more in the 
following sections).
+    </p>
+
+    <p>
+        Note that we can always describe the topology as we did above at any 
given point while we are building it in the code, so as a user you can 
interactively "try and taste" your computational logic defined in the topology 
until you are happy with it.
+        Suppose we are already done with this simple topology that just pipes 
data from one Kafka topic to another in an endless streaming manner,
+        we can now construct the Streams client with the two components we 
have just constructed above: the configuration map and the topology object
+        (one can also construct a <code>StreamsConfig</code> object from the 
<code>props</code> map and then pass that object to the constructor,
+        <code>KafkaStreams</code> have overloaded constructor functions to 
takes either type).
+    </p>
+
+    <pre class="brush: java;">
+        final KafkaStreams streams = new KafkaStreams(topology, props);
+    </pre>
+
+    <p>
+        By calling its <code>start()</code> function we can trigger the 
execution of this client.
+        The execution won't stop until <code>close()</code> is called on this 
client.
+        We can, for example, add a shutdown hook with a countdown latch to 
capture a user interrupt and close the client upon terminating this program:
+    </p>
+
+    <pre class="brush: java;">
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        // attach shutdown handler to catch control-c
+        Runtime.getRuntime().addShutdownHook(new 
Thread("streams-shutdown-hook") {
+            @Override
+            public void run() {
+                streams.close();
+                latch.countDown();
+            }
+        });
+
+        try {
+            streams.start();
+            latch.await();
+        } catch (Throwable e) {
+            System.exit(1);
+        }
+        System.exit(0);
+    </pre>
+
+    <p>
+        The complete code so far looks like this:
+    </p>
+
+    <pre class="brush: java;">
+        package myapps;
+
+        import org.apache.kafka.common.serialization.Serdes;
+        import org.apache.kafka.streams.KafkaStreams;
+        import org.apache.kafka.streams.StreamsBuilder;
+        import org.apache.kafka.streams.StreamsConfig;
+        import org.apache.kafka.streams.Topology;
+
+        import java.util.Properties;
+        import java.util.concurrent.CountDownLatch;
+
+        public class Pipe {
+
+            public static void main(String[] args) throws Exception {
+                Properties props = new Properties();
+                props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
+                props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9092");
+                props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
+                props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
+
+                final StreamsBuilder builder = new StreamsBuilder();
+
+                
builder.stream("streams-plaintext-input").to("streams-pipe-output");
+
+                final Topology topology = builder.build();
+
+                final KafkaStreams streams = new KafkaStreams(topology, props);
+                final CountDownLatch latch = new CountDownLatch(1);
+
+                // attach shutdown handler to catch control-c
+                Runtime.getRuntime().addShutdownHook(new 
Thread("streams-shutdown-hook") {
+                    @Override
+                    public void run() {
+                        streams.close();
+                        latch.countDown();
+                    }
+                });
+
+                try {
+                    streams.start();
+                    latch.await();
+                } catch (Throwable e) {
+                    System.exit(1);
+                }
+                System.exit(0);
+            }
+        }
+    </pre>
+
+    <p>
+        If you already have the Kafka broker up and running at 
<code>localhost:9092</code>,
+        and the topics <code>streams-plaintext-input</code> and 
<code>streams-pipe-output</code> created on that broker,
+        you can run this code in your IDE or on the command line, using Maven:
+    </p>
+
+    <pre class="brush: brush;">
+        &gt; mvn clean package
+        &gt; mvn exec:java -Dexec.mainClass=myapps.Pipe
+    </pre>
+
+    <p>
+        For detailed instructions on how to run a Streams application and 
observe its computing results,
+        please read the <a 
href="/{{version}}/documentation/streams/quickstart">Play with a Streams 
Application</a> section.
+        We will not talk about this in the rest of this section.
+    </p>
+
+    <h4><a id="tutorial_code_linesplit" 
href="#tutorial_code_linesplit">Writing a second Streams application: Line 
Split</a></h4>
+
+    <p>
+        We have learned how to construct a Streams client with its two key 
components: the <code>StreamsConfig</code> and <code>Topology</code>.
+        Now let's move on to add some real processing logic by augmenting the 
current topology.
+        We can first create another program by first copy the existing 
<code>Pipe.java</code> class:
+    </p>
+
+    <pre class="brush: brush;">
+        &gt; cp src/main/java/myapps/Pipe.java 
src/main/java/myapps/LineSplit.java
+    </pre>
+
+    <p>
+        And change its class name as well as the application id config to 
distinguish with the original program:
+    </p>
+
+    <pre class="brush: java;">
+        public class Pipe {
+
+            public static void main(String[] args) throws Exception {
+                Properties props = new Properties();
+                props.put(StreamsConfig.APPLICATION_ID_CONFIG, 
"streams-linesplit");
+                // ...
+            }
+        }
+    </pre>
+
+    <p>
+        Since each of the source stream's record is a <code>String</code> 
typed key-value pair,
+        let's treat the value string as a text line and split it into words 
with a <code>FlatMapValues</code> operator:
+    </p>
+
+    <pre class="brush: java;">
+        KStream&lt;String, String&gt; source = 
builder.stream("streams-plaintext-input");
+        KStream&lt;String, String&gt; words = builder.flatMapValues(new 
ValueMapper&lt;String, Iterable&lt;String&gt;&gt;() {
+                    @Override
+                    public Iterable&lt;String&gt; apply(String value) {
+                        return Arrays.asList(value.split("\\W+"));
+                    }
+                });
+    </pre>
+
+    <p>
+        The operator will take the <code>source</code> stream as its input, 
and generate a new stream named <code>words</code>
+        by processing each record from its source stream in order and breaking 
its value string into a list of words, and producing
+        each word as a new record to the output <code>words</code> stream.
+        This is a stateless operator that does not need to keep track of any 
previously received records or processed results.
+        Note if you are using JDK 8 you can use lambda expression and simplify 
the above code as:
+    </p>
+
+    <pre class="brush: java;">
+        KStream&lt;String, String&gt; source = 
builder.stream("streams-plaintext-input");
+        KStream&lt;String, String&gt; words = source.flatMapValues(value -> 
Arrays.asList(value.split("\\W+")));
+    </pre>
+
+    <p>
+        And finally we can write the word stream back into another Kafka 
topic, say <code>streams-linesplit-output</code>.
+        Again, these two steps can be concatenated as the following (assuming 
lambda expression is used):
+    </p>
+
+    <pre class="brush: java;">
+        KStream&lt;String, String&gt; source = 
builder.stream("streams-plaintext-input");
+        source.flatMapValues(value -> Arrays.asList(value.split("\\W+")))
+              .to("streams-linesplit-output");
+    </pre>
+
+    <p>
+        If we now describe this augmented topology as 
<code>System.out.println(topology.describe())</code>, we will get the following:
+    </p>
+
+    <pre class="brush: bash;">
+        &gt; mvn clean package
+        &gt; mvn exec:java -Dexec.mainClass=myapps.LineSplit
+        Sub-topologies:
+          Sub-topology: 0
+            Source: KSTREAM-SOURCE-0000000000(topics: streams-plaintext-input) 
--> KSTREAM-FLATMAPVALUES-0000000001
+            Processor: KSTREAM-FLATMAPVALUES-0000000001(stores: []) --> 
KSTREAM-SINK-0000000002 <-- KSTREAM-SOURCE-0000000000
+            Sink: KSTREAM-SINK-0000000002(topic: streams-linesplit-output) <-- 
KSTREAM-FLATMAPVALUES-0000000001
+          Global Stores:
+            none
+    </pre>
+
+    <p>
+        As we can see above, a new processor node 
<code>KSTREAM-FLATMAPVALUES-0000000001</code> is injected into the topology 
between the original source and sink nodes.
+        It takes the source node as its parent and the sink node as its child.
+        In other words, each record fetched by the source node will first 
traverse to the newly added <code>KSTREAM-FLATMAPVALUES-0000000001</code> node 
to be processed,
+        and one or more new records will be generated as a result. They will 
continue traverse down to the sink node to be written back to Kafka.
+        Note this processor node is "stateless" as it is not associated with 
any stores (i.e. <code>(stores: [])</code>).
+    </p>
+
+    <p>
+        The complete code looks like this (assuming lambda expression is used):
+    </p>
+
+    <pre class="brush: java;">
+        package myapps;
+
+        import org.apache.kafka.common.serialization.Serdes;
+        import org.apache.kafka.streams.KafkaStreams;
+        import org.apache.kafka.streams.StreamsBuilder;
+        import org.apache.kafka.streams.StreamsConfig;
+        import org.apache.kafka.streams.Topology;
+
+        import java.util.Arrays;
+        import java.util.Properties;
+        import java.util.concurrent.CountDownLatch;
+
+        public class LineSplit {
+
+            public static void main(String[] args) throws Exception {
+                Properties props = new Properties();
+                props.put(StreamsConfig.APPLICATION_ID_CONFIG, 
"streams-linesplit");
+                props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9092");
+                props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
+                props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
+
+                final StreamsBuilder builder = new StreamsBuilder();
+
+                KStream&lt;String, String&gt; source = 
builder.stream("streams-plaintext-input");
+                source.flatMapValues(value -> 
Arrays.asList(value.split("\\W+")))
+                      .to("streams-linesplit-output");
+
+                final Topology topology = builder.build();
+                final KafkaStreams streams = new KafkaStreams(topology, props);
+                final CountDownLatch latch = new CountDownLatch(1);
+
+                // ... same as Pipe.java below
+            }
+        }
+    </pre>
+
+    <h4><a id="tutorial_code_wordcount" 
href="#tutorial_code_wordcount">Writing a third Streams application: 
Wordcount</a></h4>
+
+    <p>
+        Let's now take a step further to add some "stateful" computations to 
the topology by counting the occurrence of the words split from the source text 
stream.
+        Following similar steps let's create another program based on the 
<code>LineSplit.java</code> class:
+    </p>
+
+    <pre class="brush: java;">
+        public class WordCount {
+
+            public static void main(String[] args) throws Exception {
+                Properties props = new Properties();
+                props.put(StreamsConfig.APPLICATION_ID_CONFIG, 
"streams-wordcount");
+                // ...
+            }
+        }
+    </pre>
+
+    <p>
+        In order to count the words we can first modify the 
<code>flatMapValues</code> operator to treat all of them as lower case 
(assuming lambda expression is used):
+    </p>
+
+    <pre class="brush: java;">
+        source.flatMapValues(new ValueMapper&lt;String, 
Iterable&lt;String&gt;&gt;() {
+                    @Override
+                    public Iterable&lt;String&gt; apply(String value) {
+                        return 
Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"));
+                    }
+                });
+    </pre>
+
+    <p>
+        In order to do the counting aggregation we have to first specify that 
we want to key the stream on the value string, i.e. the lower cased word, with 
a <code>groupBy</code> operator.
+        This operator generate a new grouped stream, which can then be 
aggregated by a <code>count</code> operator, which generates a running count on 
each of the grouped keys:
+    </p>
+
+    <pre class="brush: java;">
+        KTable&lt;String, Long&gt; counts =
+        source.flatMapValues(new ValueMapper&lt;String, 
Iterable&lt;String&gt;&gt;() {
+                    @Override
+                    public Iterable&lt;String&gt; apply(String value) {
+                        return 
Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"));
+                    }
+                })
+              .groupBy(new KeyValueMapper&lt;String, String, String&gt;() {
+                   @Override
+                   public String apply(String key, String value) {
+                       return value;
+                   }
+                })
+              .count("Counts");
+    </pre>
+
+    <p>
+        Note that the <code>count</code> operator has a <code>String</code> 
typed parameter <code>Counts</code>,
+        which stores the running counts that keep being updated as more 
records are piped and processed from the source Kafka topic.
+        This <code>Counts</code> store can be queried in real-time, with 
details described in the <a 
href="/{{version}}/documentation/streams/developer-guide#streams_interactive_queries">Developer
 Manual</a>.
+    </p>
+
+    <p>
+        We can also write the <code>counts</code> KTable's changelog stream 
back into another Kafka topic, say <code>streams-wordcount-output</code>.
+        Note that this time the value type is no longer <code>String</code> 
but <code>Long</code>, so the default serialization classes are not viable for 
writing it to Kafka anymore.
+        We need to provide overridden serialization methods for 
<code>Long</code> types, otherwise a runtime exception will be thrown:
+    </p>
+
+    <pre class="brush: java;">
+        counts.to(Serdes.String(), Serdes.Long(), "streams-wordcount-output");
+    </pre>
+
+    <p>
+        Note that in order to read the changelog stream from topic 
<code>streams-wordcount-output</code>,
+        one needs to set the value deserialization as 
<code>org.apache.kafka.common.serialization.LongDeserializer</code>.
+        Details of this can be found in the <a 
href="/{{version}}/documentation/streams/quickstart">Play with a Streams 
Application</a> section.
+        Assuming lambda expression from JDK 8 can be used, the above code can 
be simplified as:
+    </p>
+
+    <pre class="brush: java;">
+        KStream&lt;String, String&gt; source = 
builder.stream("streams-plaintext-input");
+        source.flatMapValues(value -> 
Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
+              .groupBy((key, value) -> value)
+              .count("Counts")
+              .to(Serdes.String(), Serdes.Long(), "streams-wordcount-output");
+    </pre>
+
+    <p>
+        If we again describe this augmented topology as 
<code>System.out.println(topology.describe())</code>, we will get the following:
+    </p>
+
+    <pre class="brush: bash;">
+        &gt; mvn clean package
+        &gt; mvn exec:java -Dexec.mainClass=myapps.WordCount
+        Sub-topologies:
+          Sub-topology: 0
+            Source: KSTREAM-SOURCE-0000000000(topics: streams-plaintext-input) 
--> KSTREAM-FLATMAPVALUES-0000000001
+            Processor: KSTREAM-FLATMAPVALUES-0000000001(stores: []) --> 
KSTREAM-KEY-SELECT-0000000002 <-- KSTREAM-SOURCE-0000000000
+            Processor: KSTREAM-KEY-SELECT-0000000002(stores: []) --> 
KSTREAM-FILTER-0000000005 <-- KSTREAM-FLATMAPVALUES-0000000001
+            Processor: KSTREAM-FILTER-0000000005(stores: []) --> 
KSTREAM-SINK-0000000004 <-- KSTREAM-KEY-SELECT-0000000002
+            Sink: KSTREAM-SINK-0000000004(topic: Counts-repartition) <-- 
KSTREAM-FILTER-0000000005
+          Sub-topology: 1
+            Source: KSTREAM-SOURCE-0000000006(topics: Counts-repartition) --> 
KSTREAM-AGGREGATE-0000000003
+            Processor: KSTREAM-AGGREGATE-0000000003(stores: [Counts]) --> 
KTABLE-TOSTREAM-0000000007 <-- KSTREAM-SOURCE-0000000006
+            Processor: KTABLE-TOSTREAM-0000000007(stores: []) --> 
KSTREAM-SINK-0000000008 <-- KSTREAM-AGGREGATE-0000000003
+            Sink: KSTREAM-SINK-0000000008(topic: streams-wordcount-output) <-- 
KTABLE-TOSTREAM-0000000007
+        Global Stores:
+          none
+    </pre>
+
+    <p>
+        As we can see above, the topology now contains two disconnected 
sub-topologies.
+        The first sub-topology's sink node 
<code>KSTREAM-SINK-0000000004</code> will write to a repartition topic 
<code>Counts-repartition</code>,
+        which will be read by the second sub-topology's source node 
<code>KSTREAM-SOURCE-0000000006</code>.
+        The repartition topic is used to "shuffle" the source stream by its 
aggregation key, which is in this case the value string.
+        In addition, inside the first sub-topology a stateless 
<code>KSTREAM-FILTER-0000000005</code> node is injected between the grouping 
<code>KSTREAM-KEY-SELECT-0000000002</code> node and the sink node to filter out 
any intermediate record whose aggregate key is empty.
+    </p>
+    <p>
+        In the second sub-topology, the aggregation node 
<code>KSTREAM-AGGREGATE-0000000003</code> is associated with a state store 
named <code>Counts</code> (the name is specified by the user in the 
<code>count</code> operator).
+        Upon receiving each record from its upcoming stream source node, the 
aggregation processor will first query its associated <code>Counts</code> store 
to get the current count for that key, augment by one, and then write the new 
count back to the store.
+        Each updated count for the key will also be piped downstream to the 
<code>KTABLE-TOSTREAM-0000000007</code> node, which interpret this update 
stream as a record stream before further piping to the sink node 
<code>KSTREAM-SINK-0000000008</code> for writing back to Kafka.
+    </p>
+
+    <p>
+        The complete code looks like this (assuming lambda expression is used):
+    </p>
+
+    <pre class="brush: java;">
+        package myapps;
+
+        import org.apache.kafka.common.serialization.Serdes;
+        import org.apache.kafka.streams.KafkaStreams;
+        import org.apache.kafka.streams.StreamsBuilder;
+        import org.apache.kafka.streams.StreamsConfig;
+        import org.apache.kafka.streams.Topology;
+
+        import java.util.Arrays;
+        import java.util.Properties;
+        import java.util.concurrent.CountDownLatch;
+
+        public class WordCount {
+
+            public static void main(String[] args) throws Exception {
+                Properties props = new Properties();
+                props.put(StreamsConfig.APPLICATION_ID_CONFIG, 
"streams-wordcount");
+                props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9092");
+                props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
+                props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
+
+                final StreamsBuilder builder = new StreamsBuilder();
+
+                KStream&lt;String, String&gt; source = 
builder.stream("streams-plaintext-input");
+                source.flatMapValues(value -> 
Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
+                      .groupBy((key, value) -> value)
+                      .count("Counts")
+                      .to(Serdes.String(), Serdes.Long(), 
"streams-wordcount-output");
+
+                final Topology topology = builder.build();
+                final KafkaStreams streams = new KafkaStreams(topology, props);
+                final CountDownLatch latch = new CountDownLatch(1);
+
+                // ... same as Pipe.java below
+            }
+        }
+    </pre>
+
+    <div class="pagination">
+        <a href="/{{version}}/documentation/streams/quickstart" 
class="pagination__btn pagination__btn__prev">Previous</a>
+        <a href="/{{version}}/documentation/streams/developer-guide" 
class="pagination__btn pagination__btn__next">Next</a>
+    </div>
+</script>
+
+<div class="p-quickstart-streams"></div>
+
+<!--#include virtual="../../includes/_header.htm" -->
+<!--#include virtual="../../includes/_top.htm" -->
+<div class="content documentation documentation--current">
+    <!--#include virtual="../../includes/_nav.htm" -->
+    <div class="right">
+        <!--#include virtual="../../includes/_docs_banner.htm" -->
+        <ul class="breadcrumbs">
+            <li><a href="/documentation">Documentation</a></li>
+            <li><a href="/documentation/streams">Streams</a></li>
+        </ul>
+        <div class="p-content"></div>
+    </div>
+</div>
+<!--#include virtual="../../includes/_footer.htm" -->
+<script>
+$(function() {
+  // Show selected style on nav item
+  $('.b-nav__streams').addClass('selected');
+
+  // Display docs subnav items
+  $('.b-nav__docs').parent().toggleClass('nav__item__with__subs--expanded');
+});
+</script>

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2774e30/docs/toc.html
----------------------------------------------------------------------
diff --git a/docs/toc.html b/docs/toc.html
index 2ec0129..5704768 100644
--- a/docs/toc.html
+++ b/docs/toc.html
@@ -141,13 +141,14 @@
                 <li><a href="#connect_development">8.3 Connector Development 
Guide</a></li>
             </ul>
         </li>
-        <li><a href="/{{version}}/documentation/streams">9. Kafka Streams</a>
+        <li><a href="/documentation/streams">9. Kafka Streams</a>
             <ul>
                 <li><a 
href="/{{version}}/documentation/streams/quickstart">9.1 Play with a Streams 
Application</a></li>
-                <li><a 
href="/{{version}}/documentation/streams/developer-guide">9.2 Developer 
Guide</a></li>
-                <li><a 
href="/{{version}}/documentation/streams/core-concepts">9.3 Core 
Concepts</a></li>
-                <li><a 
href="/{{version}}/documentation/streams/architecture">9.4 Architecture</a></li>
-                <li><a 
href="/{{version}}/documentation/streams/upgrade-guide">9.5 Upgrade Guide and 
API Changes</a></li>
+                <li><a href="/{{version}}/documentation/streams/tutorial">9.2 
Write your own Streams Applications</a></li>
+                <li><a 
href="/{{version}}/documentation/streams/developer-guide">9.3 Developer 
Manual</a></li>
+                <li><a 
href="/{{version}}/documentation/streams/core-concepts">9.4 Core 
Concepts</a></li>
+                <li><a 
href="/{{version}}/documentation/streams/architecture">9.5 Architecture</a></li>
+                <li><a 
href="/{{version}}/documentation/streams/upgrade-guide">9.6 Upgrade 
Guide</a></li>
             </ul>
         </li>
     </ul>

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2774e30/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java
----------------------------------------------------------------------
diff --git 
a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java
 
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java
index 0831e3b..5389877 100644
--- 
a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java
+++ 
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java
@@ -18,7 +18,6 @@ package org.apache.kafka.streams.examples.pipe;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
@@ -51,13 +50,13 @@ public class PipeDemo {
 
         StreamsBuilder builder = new StreamsBuilder();
 
-        builder.stream("streams-file-input").to("streams-pipe-output");
+        builder.stream("streams-plaintext-input").to("streams-pipe-output");
 
         final KafkaStreams streams = new KafkaStreams(builder.build(), props);
         final CountDownLatch latch = new CountDownLatch(1);
 
         // attach shutdown handler to catch control-c
-        Runtime.getRuntime().addShutdownHook(new 
Thread("streams-wordcount-shutdown-hook") {
+        Runtime.getRuntime().addShutdownHook(new 
Thread("streams-pipe-shutdown-hook") {
             @Override
             public void run() {
                 streams.close();
@@ -69,8 +68,8 @@ public class PipeDemo {
             streams.start();
             latch.await();
         } catch (Throwable e) {
-            Exit.exit(1);
+            System.exit(1);
         }
-        Exit.exit(0);
+        System.exit(0);
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2774e30/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java
----------------------------------------------------------------------
diff --git 
a/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java
 
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java
index 764210b..1c2045e 100644
--- 
a/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java
+++ 
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java
@@ -19,7 +19,6 @@ package org.apache.kafka.streams.examples.temperature;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
@@ -137,8 +136,8 @@ public class TemperatureDemo {
             streams.start();
             latch.await();
         } catch (Throwable e) {
-            Exit.exit(1);
+            System.exit(1);
         }
-        Exit.exit(0);
+        System.exit(0);
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2774e30/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
----------------------------------------------------------------------
diff --git 
a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
 
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
index e3cf60c..5689d50 100644
--- 
a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
+++ 
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
@@ -18,9 +18,7 @@ package org.apache.kafka.streams.examples.wordcount;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.KStream;
@@ -37,7 +35,7 @@ import java.util.concurrent.CountDownLatch;
  * Demonstrates, using the high-level KStream DSL, how to implement the 
WordCount program
  * that computes a simple word occurrence histogram from an input text.
  *
- * In this example, the input stream reads from a topic named 
"streams-file-input", where the values of messages
+ * In this example, the input stream reads from a topic named 
"streams-plaintext-input", where the values of messages
  * represent lines of text; and the histogram output is written to topic 
"streams-wordcount-output" where each record
  * is an updated count of a single word.
  *
@@ -62,7 +60,7 @@ public class WordCountDemo {
 
         StreamsBuilder builder = new StreamsBuilder();
 
-        KStream<String, String> source = 
builder.stream("streams-wordcount-input");
+        KStream<String, String> source = 
builder.stream("streams-plaintext-input");
 
         KTable<String, Long> counts = source
                 .flatMapValues(new ValueMapper<String, Iterable<String>>() {
@@ -70,13 +68,13 @@ public class WordCountDemo {
                     public Iterable<String> apply(String value) {
                         return 
Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" "));
                     }
-                }).map(new KeyValueMapper<String, String, KeyValue<String, 
String>>() {
+                })
+                .groupBy(new KeyValueMapper<String, String, String>() {
                     @Override
-                    public KeyValue<String, String> apply(String key, String 
value) {
-                        return new KeyValue<>(value, value);
+                    public String apply(String key, String value) {
+                        return value;
                     }
                 })
-                .groupByKey()
                 .count("Counts");
 
         // need to override value serde to Long type
@@ -98,8 +96,8 @@ public class WordCountDemo {
             streams.start();
             latch.await();
         } catch (Throwable e) {
-            Exit.exit(1);
+            System.exit(1);
         }
-        Exit.exit(0);
+        System.exit(0);
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2774e30/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
----------------------------------------------------------------------
diff --git 
a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
 
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
index 34bb8bb..b0b8be5 100644
--- 
a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
+++ 
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
@@ -18,7 +18,6 @@ package org.apache.kafka.streams.examples.wordcount;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
@@ -40,7 +39,7 @@ import java.util.concurrent.CountDownLatch;
  * Demonstrates, using the low-level Processor APIs, how to implement the 
WordCount program
  * that computes a simple word occurrence histogram from an input text.
  *
- * In this example, the input stream reads from a topic named 
"streams-file-input", where the values of messages
+ * In this example, the input stream reads from a topic named 
"streams-plaintext-input", where the values of messages
  * represent lines of text; and the histogram output is written to topic 
"streams-wordcount-processor-output" where each record
  * is an updated count of a single word.
  *
@@ -121,7 +120,7 @@ public class WordCountProcessorDemo {
 
         Topology builder = new Topology();
 
-        builder.addSource("Source", "streams-wordcount-input");
+        builder.addSource("Source", "streams-plaintext-input");
 
         builder.addProcessor("Process", new MyProcessorSupplier(), "Source");
         
builder.addStateStore(Stores.create("Counts").withStringKeys().withIntegerValues().inMemory().build(),
 "Process");
@@ -144,8 +143,8 @@ public class WordCountProcessorDemo {
             streams.start();
             latch.await();
         } catch (Throwable e) {
-            Exit.exit(1);
+            System.exit(1);
         }
-        Exit.exit(0);
+        System.exit(0);
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2774e30/streams/quickstart/java/pom.xml
----------------------------------------------------------------------
diff --git a/streams/quickstart/java/pom.xml b/streams/quickstart/java/pom.xml
new file mode 100644
index 0000000..ff0a417
--- /dev/null
+++ b/streams/quickstart/java/pom.xml
@@ -0,0 +1,36 @@
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    </properties>
+
+    <parent>
+        <groupId>org.apache.kafka</groupId>
+        <artifactId>streams-quickstart</artifactId>
+        <version>1.0.0-SNAPSHOT</version>
+        <relativePath>..</relativePath>
+    </parent>
+
+    <artifactId>streams-quickstart-java</artifactId>
+    <packaging>maven-archetype</packaging>
+
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2774e30/streams/quickstart/java/src/main/resources/META-INF/maven/archetype-metadata.xml
----------------------------------------------------------------------
diff --git 
a/streams/quickstart/java/src/main/resources/META-INF/maven/archetype-metadata.xml
 
b/streams/quickstart/java/src/main/resources/META-INF/maven/archetype-metadata.xml
new file mode 100644
index 0000000..9e0d8bd
--- /dev/null
+++ 
b/streams/quickstart/java/src/main/resources/META-INF/maven/archetype-metadata.xml
@@ -0,0 +1,34 @@
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<archetype-descriptor
+        
xmlns="http://maven.apache.org/plugins/maven-archetype-plugin/archetype-descriptor/1.0.0";
+        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+        
xsi:schemaLocation="http://maven.apache.org/plugins/maven-archetype-plugin/archetype-descriptor/1.0.0
 http://maven.apache.org/xsd/archetype-descriptor-1.0.0.xsd";
+        name="streams-quickstart-java">
+    <fileSets>
+        <fileSet filtered="true" packaged="true" encoding="UTF-8">
+            <directory>src/main/java</directory>
+            <includes>
+                <include>**/*.java</include>
+            </includes>
+        </fileSet>
+        <fileSet encoding="UTF-8">
+            <directory>src/main/resources</directory>
+        </fileSet>
+    </fileSets>
+</archetype-descriptor>

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2774e30/streams/quickstart/java/src/main/resources/archetype-resources/pom.xml
----------------------------------------------------------------------
diff --git 
a/streams/quickstart/java/src/main/resources/archetype-resources/pom.xml 
b/streams/quickstart/java/src/main/resources/archetype-resources/pom.xml
new file mode 100644
index 0000000..8b79e06
--- /dev/null
+++ b/streams/quickstart/java/src/main/resources/archetype-resources/pom.xml
@@ -0,0 +1,136 @@
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <groupId>${groupId}</groupId>
+    <artifactId>${artifactId}</artifactId>
+    <version>${version}</version>
+    <packaging>jar</packaging>
+
+    <name>Kafka Streams Quickstart :: Java</name>
+
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <kafka.version>1.0.0-SNAPSHOT</kafka.version>
+        <slf4j.version>1.7.7</slf4j.version>
+        <log4j.version>1.2.17</log4j.version>
+    </properties>
+
+    <repositories>
+        <repository>
+            <id>apache.snapshots</id>
+            <name>Apache Development Snapshot Repository</name>
+            
<url>https://repository.apache.org/content/repositories/snapshots/</url>
+            <releases>
+                <enabled>false</enabled>
+            </releases>
+            <snapshots>
+                <enabled>true</enabled>
+            </snapshots>
+        </repository>
+    </repositories>
+
+    <!--
+               Execute "mvn clean package -Pbuild-jar"
+               to build a jar file out of this project!
+       -->
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.1</version>
+                <configuration>
+                    <source>1.8</source>
+                    <target>1.8</target>
+                </configuration>
+            </plugin>
+        </plugins>
+
+        <pluginManagement>
+            <plugins>
+                <plugin>
+                    <artifactId>maven-compiler-plugin</artifactId>
+                    <configuration>
+                        <source>1.8</source>
+                        <target>1.8</target>
+                        <compilerId>jdt</compilerId>
+                    </configuration>
+                    <dependencies>
+                        <dependency>
+                            <groupId>org.eclipse.tycho</groupId>
+                            <artifactId>tycho-compiler-jdt</artifactId>
+                            <version>0.21.0</version>
+                        </dependency>
+                    </dependencies>
+                </plugin>
+                <plugin>
+                    <groupId>org.eclipse.m2e</groupId>
+                    <artifactId>lifecycle-mapping</artifactId>
+                    <version>1.0.0</version>
+                    <configuration>
+                        <lifecycleMappingMetadata>
+                            <pluginExecutions>
+                                <pluginExecution>
+                                    <pluginExecutionFilter>
+                                        
<groupId>org.apache.maven.plugins</groupId>
+                                        
<artifactId>maven-assembly-plugin</artifactId>
+                                        <versionRange>[2.4,)</versionRange>
+                                        <goals>
+                                            <goal>single</goal>
+                                        </goals>
+                                    </pluginExecutionFilter>
+                                    <action>
+                                        <ignore/>
+                                    </action>
+                                </pluginExecution>
+                                <pluginExecution>
+                                    <pluginExecutionFilter>
+                                        
<groupId>org.apache.maven.plugins</groupId>
+                                        
<artifactId>maven-compiler-plugin</artifactId>
+                                        <versionRange>[3.1,)</versionRange>
+                                        <goals>
+                                            <goal>testCompile</goal>
+                                            <goal>compile</goal>
+                                        </goals>
+                                    </pluginExecutionFilter>
+                                    <action>
+                                        <ignore/>
+                                    </action>
+                                </pluginExecution>
+                            </pluginExecutions>
+                        </lifecycleMappingMetadata>
+                    </configuration>
+                </plugin>
+            </plugins>
+        </pluginManagement>
+    </build>
+
+    <dependencies>
+        <!-- Apache Kafka dependencies -->
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-streams</artifactId>
+            <version>${kafka.version}</version>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2774e30/streams/quickstart/java/src/main/resources/archetype-resources/src/main/java/LineSplit.java
----------------------------------------------------------------------
diff --git 
a/streams/quickstart/java/src/main/resources/archetype-resources/src/main/java/LineSplit.java
 
b/streams/quickstart/java/src/main/resources/archetype-resources/src/main/java/LineSplit.java
new file mode 100644
index 0000000..ec40d2a
--- /dev/null
+++ 
b/streams/quickstart/java/src/main/resources/archetype-resources/src/main/java/LineSplit.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package ${package};
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.kstream.ValueMapper;
+
+import java.util.Arrays;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * In this example, we implement a simple LineSplit program using the 
high-level Streams DSL
+ * that reads from a source topic "streams-plaintext-input", where the values 
of messages represent lines of text;
+ * the code split each text line in string into words and then write back into 
a sink topic "streams-linesplit-output" where
+ * each record represents a single word.
+ */
+public class LineSplit {
+
+    public static void main(String[] args) throws Exception {
+        Properties props = new Properties();
+        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-linesplit");
+        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
+        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
+
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        builder.<String, String>stream("streams-plaintext-input")
+               .flatMapValues(new ValueMapper<String, Iterable<String>>() {
+                    @Override
+                    public Iterable<String> apply(String value) {
+                        return Arrays.asList(value.split("\\W+"));
+                    }
+                })
+               .to("streams-linesplit-output");
+
+        /* ------- use the code below for Java 8 and uncomment the above ----
+
+        builder.stream("streams-plaintext-input")
+               .flatMapValues(value -> Arrays.asList(value.split("\\W+")))
+               .to("streams-linesplit-output");
+
+           ----------------------------------------------------------------- */
+
+
+        final Topology topology = builder.build();
+        final KafkaStreams streams = new KafkaStreams(topology, props);
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        // attach shutdown handler to catch control-c
+        Runtime.getRuntime().addShutdownHook(new 
Thread("streams-shutdown-hook") {
+            @Override
+            public void run() {
+                streams.close();
+                latch.countDown();
+            }
+        });
+
+        try {
+            streams.start();
+            latch.await();
+        } catch (Throwable e) {
+            System.exit(1);
+        }
+        System.exit(0);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2774e30/streams/quickstart/java/src/main/resources/archetype-resources/src/main/java/Pipe.java
----------------------------------------------------------------------
diff --git 
a/streams/quickstart/java/src/main/resources/archetype-resources/src/main/java/Pipe.java
 
b/streams/quickstart/java/src/main/resources/archetype-resources/src/main/java/Pipe.java
new file mode 100644
index 0000000..b3152a7
--- /dev/null
+++ 
b/streams/quickstart/java/src/main/resources/archetype-resources/src/main/java/Pipe.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package ${package};
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * In this example, we implement a simple LineSplit program using the 
high-level Streams DSL
+ * that reads from a source topic "streams-plaintext-input", where the values 
of messages represent lines of text,
+ * and writes the messages as-is into a sink topic "streams-pipe-output".
+ */
+public class Pipe {
+
+    public static void main(String[] args) throws Exception {
+        Properties props = new Properties();
+        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
+        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
+        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
+
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        builder.stream("streams-plaintext-input").to("streams-pipe-output");
+
+        final Topology topology = builder.build();
+        final KafkaStreams streams = new KafkaStreams(topology, props);
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        // attach shutdown handler to catch control-c
+        Runtime.getRuntime().addShutdownHook(new 
Thread("streams-shutdown-hook") {
+            @Override
+            public void run() {
+                streams.close();
+                latch.countDown();
+            }
+        });
+
+        try {
+            streams.start();
+            latch.await();
+        } catch (Throwable e) {
+            System.exit(1);
+        }
+        System.exit(0);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2774e30/streams/quickstart/java/src/main/resources/archetype-resources/src/main/java/WordCount.java
----------------------------------------------------------------------
diff --git 
a/streams/quickstart/java/src/main/resources/archetype-resources/src/main/java/WordCount.java
 
b/streams/quickstart/java/src/main/resources/archetype-resources/src/main/java/WordCount.java
new file mode 100644
index 0000000..6dafa8c
--- /dev/null
+++ 
b/streams/quickstart/java/src/main/resources/archetype-resources/src/main/java/WordCount.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package ${package};
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.ValueMapper;
+
+import java.util.Arrays;
+import java.util.Locale;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * In this example, we implement a simple WordCount program using the 
high-level Streams DSL
+ * that reads from a source topic "streams-plaintext-input", where the values 
of messages represent lines of text,
+ * split each text line into words and then compute the word occurence 
histogram, write the continuous updated histogram
+ * into a topic "streams-wordcount-output" where each record is an updated 
count of a single word.
+ */
+public class WordCount {
+
+    public static void main(String[] args) throws Exception {
+        Properties props = new Properties();
+        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
+        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
+        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
+
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        builder.<String, String>stream("streams-plaintext-input")
+               .flatMapValues(new ValueMapper<String, Iterable<String>>() {
+                    @Override
+                    public Iterable<String> apply(String value) {
+                        return 
Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"));
+                    }
+                })
+               .groupBy(new KeyValueMapper<String, String, String>() {
+                   @Override
+                   public String apply(String key, String value) {
+                       return value;
+                   }
+                })
+               .count("Counts")
+               .to(Serdes.String(), Serdes.Long(), "streams-wordcount-output");
+
+
+        /* ------- use the code below for Java 8 and uncomment the above ----
+
+        builder.stream("streams-plaintext-input")
+               .flatMapValues(value -> 
Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
+               .groupBy((key, value) -> value)
+               .count("Counts")
+               .to(Serdes.String(), Serdes.Long(), "streams-wordcount-output");
+
+           ----------------------------------------------------------------- */
+
+        final Topology topology = builder.build();
+        final KafkaStreams streams = new KafkaStreams(topology, props);
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        // attach shutdown handler to catch control-c
+        Runtime.getRuntime().addShutdownHook(new 
Thread("streams-shutdown-hook") {
+            @Override
+            public void run() {
+                streams.close();
+                latch.countDown();
+            }
+        });
+
+        try {
+            streams.start();
+            latch.await();
+        } catch (Throwable e) {
+            System.exit(1);
+        }
+        System.exit(0);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2774e30/streams/quickstart/java/src/main/resources/archetype-resources/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git 
a/streams/quickstart/java/src/main/resources/archetype-resources/src/main/resources/log4j.properties
 
b/streams/quickstart/java/src/main/resources/archetype-resources/src/main/resources/log4j.properties
new file mode 100644
index 0000000..b620f1b
--- /dev/null
+++ 
b/streams/quickstart/java/src/main/resources/archetype-resources/src/main/resources/log4j.properties
@@ -0,0 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+log4j.rootLogger=INFO, console
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x 
- %m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2774e30/streams/quickstart/java/src/test/resources/projects/basic/archetype.properties
----------------------------------------------------------------------
diff --git 
a/streams/quickstart/java/src/test/resources/projects/basic/archetype.properties
 
b/streams/quickstart/java/src/test/resources/projects/basic/archetype.properties
new file mode 100644
index 0000000..c4a7c16
--- /dev/null
+++ 
b/streams/quickstart/java/src/test/resources/projects/basic/archetype.properties
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+groupId=org.apache.kafka.archtypetest
+version=0.1
+artifactId=basic
+package=org.apache.kafka.archetypetest

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2774e30/streams/quickstart/java/src/test/resources/projects/basic/goal.txt
----------------------------------------------------------------------
diff --git a/streams/quickstart/java/src/test/resources/projects/basic/goal.txt 
b/streams/quickstart/java/src/test/resources/projects/basic/goal.txt
new file mode 100644
index 0000000..f8808ba
--- /dev/null
+++ b/streams/quickstart/java/src/test/resources/projects/basic/goal.txt
@@ -0,0 +1 @@
+compile
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2774e30/streams/quickstart/pom.xml
----------------------------------------------------------------------
diff --git a/streams/quickstart/pom.xml b/streams/quickstart/pom.xml
new file mode 100644
index 0000000..ef1ee0d
--- /dev/null
+++ b/streams/quickstart/pom.xml
@@ -0,0 +1,101 @@
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <groupId>org.apache.kafka</groupId>
+    <artifactId>streams-quickstart</artifactId>
+    <packaging>pom</packaging>
+    <version>1.0.0-SNAPSHOT</version>
+
+    <name>Kafka Streams :: Quickstart</name>
+
+    <parent>
+        <groupId>org.apache</groupId>
+        <artifactId>apache</artifactId>
+        <version>18</version>
+    </parent>
+
+    <modules>
+        <module>java</module>
+    </modules>
+    <build>
+        <extensions>
+            <extension>
+                <groupId>org.apache.maven.archetype</groupId>
+                <artifactId>archetype-packaging</artifactId>
+                <version>2.2</version>
+            </extension>
+        </extensions>
+        <pluginManagement>
+            <plugins>
+                <plugin>
+                    <groupId>org.apache.maven.plugins</groupId>
+                    <artifactId>maven-archetype-plugin</artifactId>
+                    <version>2.2</version>
+                </plugin>
+            </plugins>
+        </pluginManagement>
+        <plugins>
+            <plugin>
+                <artifactId>maven-archetype-plugin</artifactId>
+                <version>2.2</version>
+                <configuration>
+                    <skip>true</skip>
+                </configuration>
+            </plugin>
+            <!-- deactivate the shade plugin for the quickstart archetypes -->
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <phase/>
+                    </execution>
+                </executions>
+            </plugin>
+
+            <plugin>
+                <groupId>com.github.siom79.japicmp</groupId>
+                <artifactId>japicmp-maven-plugin</artifactId>
+                <configuration>
+                    <skip>true</skip>
+                </configuration>
+            </plugin>
+
+            <!-- use alternative delimiter for filtering resources -->
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-resources-plugin</artifactId>
+                <configuration>
+                    <useDefaultDelimiters>false</useDefaultDelimiters>
+                    <delimiters>
+                        <delimiter>@</delimiter>
+                    </delimiters>
+                </configuration>
+            </plugin>
+        </plugins>
+        <resources>
+            <resource>
+                <directory>src/main/resources</directory>
+                <filtering>true</filtering>
+            </resource>
+        </resources>
+    </build>
+</project>
\ No newline at end of file

Reply via email to