This is an automated email from the ASF dual-hosted git repository.

fhueske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit df8f9a586143bbd719b6e9f03592e02e45629a9a
Author: Seth Wiesman <sjwies...@gmail.com>
AuthorDate: Mon Jul 22 16:01:44 2019 -0500

    [FLINK-12746][docs] Add DataStream API Walkthrough
    
    This closes #9201.
---
 docs/fig/fraud-transactions.svg                    |  71 ++
 .../getting-started/walkthroughs/datastream_api.md | 925 +++++++++++++++++++++
 .../walkthroughs/datastream_api.zh.md              | 925 +++++++++++++++++++++
 docs/getting-started/walkthroughs/table_api.md     |   2 +-
 docs/getting-started/walkthroughs/table_api.zh.md  |   2 +-
 flink-end-to-end-tests/run-nightly-tests.sh        |   2 +
 flink-end-to-end-tests/test-scripts/common.sh      |  12 +
 flink-end-to-end-tests/test-scripts/test_cli.sh    |  11 -
 ...throughs.sh => test_datastream_walkthroughs.sh} |  35 +-
 .../test-scripts/test_table_walkthroughs.sh        |   1 +
 .../flink/walkthrough/common/entity/Alert.java     |  61 ++
 .../flink/walkthrough/common/sink/AlertSink.java   |  43 +
 .../flink-walkthrough-datastream-java/pom.xml      |  37 +
 .../META-INF/maven/archetype-metadata.xml          |  36 +
 .../src/main/resources/archetype-resources/pom.xml | 225 +++++
 .../src/main/java/FraudDetectionJob.java           |  50 ++
 .../src/main/java/FraudDetector.java               |  48 ++
 .../src/main/resources/log4j.properties            |  24 +
 .../flink-walkthrough-datastream-scala/pom.xml     |  37 +
 .../META-INF/maven/archetype-metadata.xml          |  36 +
 .../src/main/resources/archetype-resources/pom.xml | 256 ++++++
 .../src/main/resources/log4j.properties            |  24 +
 .../src/main/scala/FraudDetectionJob.scala         |  51 ++
 .../src/main/scala/FraudDetector.scala             |  49 ++
 flink-walkthroughs/pom.xml                         |   2 +
 25 files changed, 2945 insertions(+), 20 deletions(-)

diff --git a/docs/fig/fraud-transactions.svg b/docs/fig/fraud-transactions.svg
new file mode 100644
index 0000000..f8e59d9
--- /dev/null
+++ b/docs/fig/fraud-transactions.svg
@@ -0,0 +1,71 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<svg version="1.1" viewBox="0 0 842.6483154296875 203.27821350097656" 
fill="none" stroke="none" stroke-linecap="square" stroke-miterlimit="10" 
xmlns:xlink="http://www.w3.org/1999/xlink"; xmlns="http://www.w3.org/2000/svg"; 
width="100%" height="100%">
+    <rect id="svgEditorBackground" x="0" y="0" width="842.648" 
height="203.278" style="fill:none;stroke:none;" />
+    <clipPath id="p.0">
+        <path d="m0 0l842.6483 0l0 203.27821l-842.6483 0l0 -203.27821z" 
clip-rule="nonzero" />
+    </clipPath>
+    <g clip-path="url(#p.0)">
+        <path fill="#000000" fill-opacity="0.0" 
d="M0,0l842.6483,0l0,203.27821l-842.6483,0Z" fill-rule="evenodd" />
+        <path fill="#000000" fill-opacity="0.0" d="m203.24672 147.81758l0 
-46.015747l138.67717 0l0 46.015747z" fill-rule="evenodd" />
+        <path fill="#000000" 
d="M265.14047,136.5832q-0.078125,-3,-1.625,-4.890625q-1.546875,-1.90625,-4.234375,-2.53125q-2.703125,-0.625,-9.21876499999999,-0.65625q-6.53125,-0.046875,-8.59375,-0.234375q-3.28125,-0.3593673700000011,-5.265625,-1.3124923999999965q-2,-0.953125,-3.1875,-2.359375q-1.203125,-1.40625,-1.828125,-3.59375q-0.390625,-1.484375,-0.390625,-4.84375l0,-2.1875l6.140625,0l0,1.203125q0,4.0625,1.46875,5.390625q1.453125,1.328125,6.53125,1.328125q10.234375,0,12.93751499999999,
 [...]
+        <path fill="#000000" 
d="M608.6628,138.55434q-0.078125,-3,-1.625,-4.890625q-1.546875,-1.90625,-4.234375,-2.53125q-2.703125,-0.625,-9.21875,-0.65625q-6.53125,-0.046875,-8.59375,-0.234375q-3.28125,-0.359375,-5.265625,-1.3125q-2,-0.953125,-3.1875,-2.359375q-1.203125,-1.40625,-1.828125,-3.59375q-0.390625,-1.484375,-0.390625,-4.84375l0,-2.1875l6.140625,0l0,1.203125q0,4.0625,1.46875,5.390625q1.453125,1.328125,6.53125,1.328125q10.234375,0,12.9375,0.4375q4.171875,0.703125,6.4375,2.40625q2
 [...]
+        <path stroke="#9e9e9e" stroke-width="1.0" stroke-linecap="butt" 
d="m43.850395 19.053806l0 76.19423" fill-rule="nonzero" />
+        <path stroke="#9e9e9e" stroke-width="1.0" stroke-linecap="butt" 
d="m119.850395 19.053806l0 76.19423" fill-rule="nonzero" />
+        <path stroke="#9e9e9e" stroke-width="1.0" stroke-linecap="butt" 
d="m195.85039 19.053806l0 76.19423" fill-rule="nonzero" />
+        <path stroke="#9e9e9e" stroke-width="1.0" stroke-linecap="butt" 
d="m271.8504 19.053806l0 76.19423" fill-rule="nonzero" />
+        <path stroke="#9e9e9e" stroke-width="1.0" stroke-linecap="butt" 
d="m347.8504 19.053806l0 76.19423" fill-rule="nonzero" />
+        <path stroke="#9e9e9e" stroke-width="1.0" stroke-linecap="butt" 
d="m423.8504 19.053806l0 76.19423" fill-rule="nonzero" />
+        <path stroke="#9e9e9e" stroke-width="1.0" stroke-linecap="butt" 
d="m499.8504 19.053806l0 76.19423" fill-rule="nonzero" />
+        <path stroke="#9e9e9e" stroke-width="1.0" stroke-linecap="butt" 
d="m575.8504 19.053806l0 76.19423" fill-rule="nonzero" />
+        <path stroke="#9e9e9e" stroke-width="1.0" stroke-linecap="butt" 
d="m651.8504 19.053806l0 76.19423" fill-rule="nonzero" />
+        <path stroke="#9e9e9e" stroke-width="1.0" stroke-linecap="butt" 
d="m727.8504 19.053806l0 76.19423" fill-rule="nonzero" />
+        <path stroke="#9e9e9e" stroke-width="1.0" stroke-linecap="butt" 
d="m803.8504 19.053806l0 76.19423" fill-rule="nonzero" />
+        <path stroke="#9e9e9e" stroke-width="1.0" stroke-linecap="butt" 
d="m43.351707 19.552494l760.9974 0" fill-rule="nonzero" />
+        <path stroke="#9e9e9e" stroke-width="1.0" stroke-linecap="butt" 
d="m43.351707 54.749344l760.9974 0" fill-rule="nonzero" />
+        <path stroke="#9e9e9e" stroke-width="1.0" stroke-linecap="butt" 
d="m43.351707 94.749344l760.9974 0" fill-rule="nonzero" />
+        <path fill="#000000" d="m68.63684 41.950916l0 -8.421875l-3.140625 0l0 
-1.125l7.5625 0l0 1.125l-3.15625 0l0 8.421875l-1.265625 0zm4.7819824 0l2.53125 
-3.59375l-2.34375 -3.3125l1.46875 0l1.0625 1.609375q0.296875 0.46875 0.484375 
0.78125q0.28125 -0.4375 0.515625 -0.765625l1.171875 -1.625l1.40625 0l-2.390625 
3.25l2.5625 3.65625l-1.4375 0l-1.421875 -2.140625l-0.375 -0.59375l-1.8125 
2.734375l-1.421875 0zm7.4453125 0l0 -6.90625l1.0625 0l0 0.984375q0.75 -1.140625 
2.1875 -1.140625q0.625 0 [...]
+        <path fill="#000000" d="m144.63684 41.950916l0 -8.421875l-3.140625 0l0 
-1.125l7.5625 0l0 1.125l-3.15625 0l0 8.421875l-1.265625 0zm4.7819824 0l2.53125 
-3.59375l-2.34375 -3.3125l1.46875 0l1.0625 1.609375q0.296875 0.46875 0.484375 
0.78125q0.28125 -0.4375 0.515625 -0.765625l1.171875 -1.625l1.40625 0l-2.390625 
3.25l2.5625 3.65625l-1.4375 0l-1.421875 -2.140625l-0.375 -0.59375l-1.8125 
2.734375l-1.421875 0zm7.4453125 0l0 -6.90625l1.0625 0l0 0.984375q0.75 -1.140625 
2.1875 -1.140625q0.625  [...]
+        <path fill="#000000" d="m220.63684 41.950916l0 -8.421875l-3.140625 0l0 
-1.125l7.5625 0l0 1.125l-3.15625 0l0 8.421875l-1.265625 0zm4.7819824 0l2.53125 
-3.59375l-2.34375 -3.3125l1.46875 0l1.0625 1.609375q0.296875 0.46875 0.484375 
0.78125q0.28125 -0.4375 0.515625 -0.765625l1.171875 -1.625l1.40625 0l-2.390625 
3.25l2.5625 3.65625l-1.4375 0l-1.421875 -2.140625l-0.375 -0.59375l-1.8125 
2.734375l-1.421875 0zm7.4453125 0l0 -6.90625l1.0625 0l0 0.984375q0.75 -1.140625 
2.1875 -1.140625q0.625  [...]
+        <path fill="#000000" d="m296.63684 41.950916l0 -8.421875l-3.140625 0l0 
-1.125l7.5625 0l0 1.125l-3.15625 0l0 8.421875l-1.265625 0zm4.7819824 0l2.53125 
-3.59375l-2.34375 -3.3125l1.46875 0l1.0625 1.609375q0.296875 0.46875 0.484375 
0.78125q0.28125 -0.4375 0.515625 -0.765625l1.171875 -1.625l1.40625 0l-2.390625 
3.25l2.5625 3.65625l-1.4375 0l-1.421875 -2.140625l-0.375 -0.59375l-1.8125 
2.734375l-1.421875 0zm7.4453125 0l0 -6.90625l1.0625 0l0 0.984375q0.75 -1.140625 
2.1875 -1.140625q0.625  [...]
+        <path fill="#000000" d="m372.63684 41.950916l0 -8.421875l-3.140625 0l0 
-1.125l7.5625 0l0 1.125l-3.15625 0l0 8.421875l-1.265625 0zm4.7819824 0l2.53125 
-3.59375l-2.34375 -3.3125l1.46875 0l1.0625 1.609375q0.296875 0.46875 0.484375 
0.78125q0.28125 -0.4375 0.515625 -0.765625l1.171875 -1.625l1.40625 0l-2.390625 
3.25l2.5625 3.65625l-1.4375 0l-1.421875 -2.140625l-0.375 -0.59375l-1.8125 
2.734375l-1.421875 0zm7.4453125 0l0 -6.90625l1.0625 0l0 0.984375q0.75 -1.140625 
2.1875 -1.140625q0.625  [...]
+        <path fill="#000000" d="m448.63684 41.950916l0 -8.421875l-3.140625 0l0 
-1.125l7.5625 0l0 1.125l-3.15625 0l0 8.421875l-1.265625 0zm4.7819824 0l2.53125 
-3.59375l-2.34375 -3.3125l1.46875 0l1.0625 1.609375q0.296875 0.46875 0.484375 
0.78125q0.28125 -0.4375 0.515625 -0.765625l1.171875 -1.625l1.40625 0l-2.390625 
3.25l2.5625 3.65625l-1.4375 0l-1.421875 -2.140625l-0.375 -0.59375l-1.8125 
2.734375l-1.421875 0zm7.4453125 0l0 -6.90625l1.0625 0l0 0.984375q0.75 -1.140625 
2.1875 -1.140625q0.625  [...]
+        <path fill="#000000" d="m524.63684 41.950916l0 -8.421875l-3.140625 0l0 
-1.125l7.5625 0l0 1.125l-3.15625 0l0 8.421875l-1.265625 0zm4.7819824 0l2.53125 
-3.59375l-2.34375 -3.3125l1.46875 0l1.0625 1.609375q0.296875 0.46875 0.484375 
0.78125q0.28125 -0.4375 0.515625 -0.765625l1.171875 -1.625l1.40625 0l-2.390625 
3.25l2.5625 3.65625l-1.4375 0l-1.421875 -2.140625l-0.375 -0.59375l-1.8125 
2.734375l-1.421875 0zm7.4453125 0l0 -6.90625l1.0625 0l0 0.984375q0.75 -1.140625 
2.1875 -1.140625q0.625  [...]
+        <path fill="#000000" d="m600.63684 41.950916l0 -8.421875l-3.140625 0l0 
-1.125l7.5625 0l0 1.125l-3.15625 0l0 8.421875l-1.265625 0zm4.7819824 0l2.53125 
-3.59375l-2.34375 -3.3125l1.46875 0l1.0625 1.609375q0.296875 0.46875 0.484375 
0.78125q0.28125 -0.4375 0.515625 -0.765625l1.171875 -1.625l1.40625 0l-2.390625 
3.25l2.5625 3.65625l-1.4375 0l-1.421875 -2.140625l-0.375 -0.59375l-1.8125 
2.734375l-1.421875 0zm7.4453125 0l0 -6.90625l1.0625 0l0 0.984375q0.75 -1.140625 
2.1875 -1.140625q0.625  [...]
+        <path fill="#000000" d="m676.63684 41.950916l0 -8.421875l-3.140625 0l0 
-1.125l7.5625 0l0 1.125l-3.15625 0l0 8.421875l-1.265625 0zm4.7819824 0l2.53125 
-3.59375l-2.34375 -3.3125l1.46875 0l1.0625 1.609375q0.296875 0.46875 0.484375 
0.78125q0.28125 -0.4375 0.515625 -0.765625l1.171875 -1.625l1.40625 0l-2.390625 
3.25l2.5625 3.65625l-1.4375 0l-1.421875 -2.140625l-0.375 -0.59375l-1.8125 
2.734375l-1.421875 0zm7.4453125 0l0 -6.90625l1.0625 0l0 0.984375q0.75 -1.140625 
2.1875 -1.140625q0.625  [...]
+        <path fill="#000000" d="m748.9306 41.950916l0 -8.421875l-3.140625 0l0 
-1.125l7.5625 0l0 1.125l-3.15625 0l0 8.421875l-1.265625 0zm4.7819824 0l2.53125 
-3.59375l-2.34375 -3.3125l1.46875 0l1.0625 1.609375q0.296875 0.46875 0.484375 
0.78125q0.28125 -0.4375 0.515625 -0.765625l1.171875 -1.625l1.40625 0l-2.390625 
3.25l2.5625 3.65625l-1.4375 0l-1.421875 -2.140625l-0.375 -0.59375l-1.8125 
2.734375l-1.421875 0zm7.4453125 0l0 -6.90625l1.0625 0l0 0.984375q0.75 -1.140625 
2.1875 -1.140625q0.625 0 [...]
+        <path fill="#000000" d="m64.78021 77.92434l0 -1.171875q-0.875 
-0.109375 -1.421875 -0.390625q-0.546875 -0.28125 -0.953125 -0.90625q-0.390625 
-0.640625 -0.46875 -1.5625l1.1875 -0.21875q0.140625 0.953125 0.484375 
1.390625q0.484375 0.625 1.171875 0.703125l0 -3.734375q-0.71875 -0.140625 
-1.46875 -0.5625q-0.5625 -0.3125 -0.875 -0.859375q-0.296875 -0.5625 -0.296875 
-1.265625q0 -1.25 0.890625 -2.015625q0.59375 -0.53125 1.75 -0.640625l0 
-0.5625l0.703125 0l0 0.5625q1.015625 0.09375 1.60937 [...]
+        <path fill="#000000" d="m140.78021 77.92434l0 -1.171875q-0.875 
-0.109375 -1.421875 -0.390625q-0.546875 -0.28125 -0.953125 -0.90625q-0.390625 
-0.640625 -0.46875 -1.5625l1.1875 -0.21875q0.140625 0.953125 0.484375 
1.390625q0.484375 0.625 1.171875 0.703125l0 -3.734375q-0.71875 -0.140625 
-1.46875 -0.5625q-0.5625 -0.3125 -0.875 -0.859375q-0.296875 -0.5625 -0.296875 
-1.265625q0 -1.25 0.890625 -2.015625q0.59375 -0.53125 1.75 -0.640625l0 
-0.5625l0.703125 0l0 0.5625q1.015625 0.09375 1.6093 [...]
+        <path fill="#000000" d="m220.48645 77.92434l0 -1.171875q-0.875 
-0.109375 -1.421875 -0.390625q-0.546875 -0.28125 -0.953125 -0.90625q-0.390625 
-0.640625 -0.46875 -1.5625l1.1875 -0.21875q0.140625 0.953125 0.484375 
1.390625q0.484375 0.625 1.171875 0.703125l0 -3.734375q-0.71875 -0.140625 
-1.46875 -0.5625q-0.5625 -0.3125 -0.875 -0.859375q-0.296875 -0.5625 -0.296875 
-1.265625q0 -1.25 0.890625 -2.015625q0.59375 -0.53125 1.75 -0.640625l0 
-0.5625l0.703125 0l0 0.5625q1.015625 0.09375 1.6093 [...]
+        <path fill="#000000" d="m289.07397 77.92434l0 -1.171875q-0.875 
-0.109375 -1.421875 -0.390625q-0.546875 -0.28125 -0.953125 -0.90625q-0.390625 
-0.640625 -0.46875 -1.5625l1.1875 -0.21875q0.140625 0.953125 0.484375 
1.390625q0.484375 0.625 1.171875 0.703125l0 -3.734375q-0.71875 -0.140625 
-1.46875 -0.5625q-0.5625 -0.3125 -0.875 -0.859375q-0.296875 -0.5625 -0.296875 
-1.265625q0 -1.25 0.890625 -2.015625q0.59375 -0.53125 1.75 -0.640625l0 
-0.5625l0.703125 0l0 0.5625q1.015625 0.09375 1.6093 [...]
+        <path fill="#000000" d="m365.07397 77.92434l0 -1.171875q-0.875 
-0.109375 -1.421875 -0.390625q-0.546875 -0.28125 -0.953125 -0.90625q-0.390625 
-0.640625 -0.46875 -1.5625l1.1875 -0.21875q0.140625 0.953125 0.484375 
1.390625q0.484375 0.625 1.171875 0.703125l0 -3.734375q-0.71875 -0.140625 
-1.46875 -0.5625q-0.5625 -0.3125 -0.875 -0.859375q-0.296875 -0.5625 -0.296875 
-1.265625q0 -1.25 0.890625 -2.015625q0.59375 -0.53125 1.75 -0.640625l0 
-0.5625l0.703125 0l0 0.5625q1.015625 0.09375 1.6093 [...]
+        <path fill="#000000" d="m444.7802 77.92434l0 -1.171875q-0.875 
-0.109375 -1.421875 -0.390625q-0.546875 -0.28125 -0.953125 -0.90625q-0.390625 
-0.640625 -0.46875 -1.5625l1.1875 -0.21875q0.140625 0.953125 0.484375 
1.390625q0.484375 0.625 1.171875 0.703125l0 -3.734375q-0.71875 -0.140625 
-1.46875 -0.5625q-0.5625 -0.3125 -0.875 -0.859375q-0.296875 -0.5625 -0.296875 
-1.265625q0 -1.25 0.890625 -2.015625q0.59375 -0.53125 1.75 -0.640625l0 
-0.5625l0.703125 0l0 0.5625q1.015625 0.09375 1.60937 [...]
+        <path fill="#000000" d="m524.48645 77.92434l0 -1.171875q-0.875 
-0.109375 -1.421875 -0.390625q-0.546875 -0.28125 -0.953125 -0.90625q-0.390625 
-0.640625 -0.46875 -1.5625l1.1875 -0.21875q0.140625 0.953125 0.484375 
1.390625q0.484375 0.625 1.171875 0.703125l0 -3.734375q-0.71875 -0.140625 
-1.46875 -0.5625q-0.5625 -0.3125 -0.875 -0.859375q-0.296875 -0.5625 -0.296875 
-1.265625q0 -1.25 0.890625 -2.015625q0.59375 -0.53125 1.75 -0.640625l0 
-0.5625l0.703125 0l0 0.5625q1.015625 0.09375 1.6093 [...]
+        <path fill="#000000" d="m596.7802 77.92434l0 -1.171875q-0.875 
-0.109375 -1.421875 -0.390625q-0.546875 -0.28125 -0.953125 -0.90625q-0.390625 
-0.640625 -0.46875 -1.5625l1.1875 -0.21875q0.140625 0.953125 0.484375 
1.390625q0.484375 0.625 1.171875 0.703125l0 -3.734375q-0.71875 -0.140625 
-1.46875 -0.5625q-0.5625 -0.3125 -0.875 -0.859375q-0.296875 -0.5625 -0.296875 
-1.265625q0 -1.25 0.890625 -2.015625q0.59375 -0.53125 1.75 -0.640625l0 
-0.5625l0.703125 0l0 0.5625q1.015625 0.09375 1.60937 [...]
+        <path fill="#000000" d="m669.074 77.92434l0 -1.171875q-0.875 -0.109375 
-1.421875 -0.390625q-0.546875 -0.28125 -0.953125 -0.90625q-0.390625 -0.640625 
-0.46875 -1.5625l1.1875 -0.21875q0.140625 0.953125 0.484375 1.390625q0.484375 
0.625 1.171875 0.703125l0 -3.734375q-0.71875 -0.140625 -1.46875 -0.5625q-0.5625 
-0.3125 -0.875 -0.859375q-0.296875 -0.5625 -0.296875 -1.265625q0 -1.25 0.890625 
-2.015625q0.59375 -0.53125 1.75 -0.640625l0 -0.5625l0.703125 0l0 
0.5625q1.015625 0.09375 1.609375 [...]
+        <path fill="#000000" d="m748.7802 77.92434l0 -1.171875q-0.875 
-0.109375 -1.421875 -0.390625q-0.546875 -0.28125 -0.953125 -0.90625q-0.390625 
-0.640625 -0.46875 -1.5625l1.1875 -0.21875q0.140625 0.953125 0.484375 
1.390625q0.484375 0.625 1.171875 0.703125l0 -3.734375q-0.71875 -0.140625 
-1.46875 -0.5625q-0.5625 -0.3125 -0.875 -0.859375q-0.296875 -0.5625 -0.296875 
-1.265625q0 -1.25 0.890625 -2.015625q0.59375 -0.53125 1.75 -0.640625l0 
-0.5625l0.703125 0l0 0.5625q1.015625 0.09375 1.60937 [...]
+        <path fill="#000000" fill-opacity="0.0" d="m236.08136 
158.07217l73.00787 0l0 68.28346l-73.00787 0z" fill-rule="evenodd" />
+        <path fill="#000000" d="m246.61261 184.99217l0 -13.359375l9.015625 0l0 
1.578125l-7.25 0l0 4.140625l6.265625 0l0 1.578125l-6.265625 0l0 
6.0625l-1.765625 0zm11.083496 0l0 -9.671875l1.46875 0l0 1.46875q0.5625 -1.03125 
1.03125 -1.359375q0.484375 -0.328125 1.0625 -0.328125q0.828125 0 1.6875 
0.53125l-0.5625 1.515625q-0.609375 -0.359375 -1.203125 -0.359375q-0.546875 0 
-0.96875 0.328125q-0.421875 0.328125 -0.609375 0.890625q-0.28125 0.875 -0.28125 
1.921875l0 5.0625l-1.625 0zm12.540802 -1 [...]
+        <path fill="#000000" fill-opacity="0.0" d="m563.58795 
160.0433l105.03937 0l0 68.28346l-105.03937 0z" fill-rule="evenodd" />
+        <path fill="#000000" d="m574.0098 186.9633l0 -13.359375l1.8125 
0l7.015625 10.484375l0 -10.484375l1.6875 0l0 13.359375l-1.8125 0l-7.015625 
-10.5l0 10.5l-1.6875 0zm12.676025 -4.84375q0 -2.6875 1.484375 -3.96875q1.25 
-1.078125 3.046875 -1.078125q2.0 0 3.265625 1.3125q1.265625 1.296875 1.265625 
3.609375q0 1.859375 -0.5625 2.9375q-0.5625 1.0625 -1.640625 1.65625q-1.0625 
0.59375 -2.328125 0.59375q-2.03125 0 -3.28125 -1.296875q-1.25 -1.3125 -1.25 
-3.765625zm1.6875 0q0 1.859375 0.796875  [...]
+    </g>
+</svg>
diff --git a/docs/getting-started/walkthroughs/datastream_api.md 
b/docs/getting-started/walkthroughs/datastream_api.md
new file mode 100644
index 0000000..8676ae4
--- /dev/null
+++ b/docs/getting-started/walkthroughs/datastream_api.md
@@ -0,0 +1,925 @@
+---
+title: "DataStream API"
+nav-id: datastreamwalkthrough
+nav-title: 'DataStream API'
+nav-parent_id: walkthroughs
+nav-pos: 1
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+Apache Flink offers a DataStream API for building robust, stateful streaming 
applications.
+It provides fine-grained control over state and time, which allows for the 
implementation of advanced event-driven systems.
+In this step-by-step guide you'll learn how to build a stateful streaming 
application with Flink's DataStream API.
+
+* This will be replaced by the TOC
+{:toc}
+
+## What Are You Building? 
+
+Credit card fraud is a growing concern in the digital age.
+Criminals steal credit card numbers by running scams or hacking into insecure 
systems.
+Stolen numbers are tested by making one or more small purchases, often for a 
dollar or less.
+If that works, they then make more significant purchases to get items they can 
sell or keep for themselves.
+
+In this tutorial, you will build a fraud detection system for alerting on 
suspicious credit card transactions.
+Using a simple set of rules, you will see how Flink allows us to implement 
advanced business logic and act in real-time.
+
+## Prerequisites
+
+This walkthrough assumes that you have some familiarity with Java or Scala, 
but you should be able to follow along even if you are coming from a different 
programming language.
+
+## Help, I’m Stuck! 
+
+If you get stuck, check out the [community support 
resources](https://flink.apache.org/gettinghelp.html).
+In particular, Apache Flink's [user mailing 
list](https://flink.apache.org/community.html#mailing-lists) is consistently 
ranked as one of the most active of any Apache project and a great way to get 
help quickly.
+
+## How to Follow Along
+
+If you want to follow along, you will require a computer with:
+
+* Java 8 
+* Maven 
+
+A provided Flink Maven Archetype will create a skeleton project with all the 
necessary dependencies quickly, so you only need to focus on filling out the 
business logic.
+These dependencies include `flink-streaming-java` which is the core dependency 
for all Flink streaming applications and `flink-walkthrough-common` that has 
data generators and other classes specific to this walkthrough.
+
+{% panel **Note:** Each code block within this walkthrough may not contain the 
full surrounding class for brevity. The full code is available [at the bottom 
of the page](#final-application). %}
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight bash %}
+$ mvn archetype:generate \
+    -DarchetypeGroupId=org.apache.flink \
+    -DarchetypeArtifactId=flink-walkthrough-datastream-java \{% unless 
site.is_stable %}
+    
-DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/
 \{% endunless %}
+    -DarchetypeVersion={{ site.version }} \
+    -DgroupId=frauddetection \
+    -DartifactId=frauddetection \
+    -Dversion=0.1 \
+    -Dpackage=spendreport \
+    -DinteractiveMode=false
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight bash %}
+$ mvn archetype:generate \
+    -DarchetypeGroupId=org.apache.flink \
+    -DarchetypeArtifactId=flink-walkthrough-datastream-scala \{% unless 
site.is_stable %}
+    
-DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/
 \{% endunless %}
+    -DarchetypeVersion={{ site.version }} \
+    -DgroupId=frauddetection \
+    -DartifactId=frauddetection \
+    -Dversion=0.1 \
+    -Dpackage=spendreport \
+    -DinteractiveMode=false
+{% endhighlight %}
+</div>
+</div>
+
+{% unless site.is_stable %}
+<p style="border-radius: 5px; padding: 5px" class="bg-danger">
+    <b>Note</b>: For Maven 3.0 or higher, it is no longer possible to specify 
the repository (-DarchetypeCatalog) via the commandline. If you wish to use the 
snapshot repository, you need to add a repository entry to your settings.xml. 
For details about this change, please refer to <a 
href="http://maven.apache.org/archetype/maven-archetype-plugin/archetype-repository.html";>Maven
 official document</a>
+</p>
+{% endunless %}
+
+You can edit the `groupId`, `artifactId` and `package` if you like. With the 
above parameters,
+Maven will create a folder named `frauddetection` that contains a project with 
all the dependencies to complete this tutorial.
+After importing the project into your editor, you can find a file 
`FraudDetectionJob.java` (or `FraudDetectionJob.scala`) with the following code 
which you can run directly inside your IDE.
+Try setting break points through out the data stream and run the code in DEBUG 
mode to get a feeling for how everything works.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+#### FraudDetectionJob.java
+
+{% highlight java %}
+package spendreport;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.walkthrough.common.sink.AlertSink;
+import org.apache.flink.walkthrough.common.entity.Alert;
+import org.apache.flink.walkthrough.common.entity.Transaction;
+import org.apache.flink.walkthrough.common.source.TransactionSource;
+
+public class FraudDetectionJob {
+
+    public static void main(String[] args) throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+        DataStream<Transaction> transactions = env
+            .addSource(new TransactionSource())
+            .name("transactions");
+        
+        DataStream<Alert> alerts = transactions
+            .keyBy(Transaction::getAccountId)
+            .process(new FraudDetector())
+            .name("fraud-detector");
+
+        alerts
+            .addSink(new AlertSink())
+            .name("send-alerts");
+
+        env.execute("Fraud Detection");
+    }
+}
+{% endhighlight %}
+
+#### FraudDetector.java
+{% highlight java %}
+package spendreport;
+
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.util.Collector;
+import org.apache.flink.walkthrough.common.entity.Alert;
+import org.apache.flink.walkthrough.common.entity.Transaction;
+
+public class FraudDetector extends KeyedProcessFunction<Long, Transaction, 
Alert> {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final double SMALL_AMOUNT = 1.00;
+    private static final double LARGE_AMOUNT = 500.00;
+    private static final long ONE_MINUTE = 60 * 1000;
+
+    @Override
+    public void processElement(
+            Transaction transaction,
+            Context context,
+            Collector<Alert> collector) throws Exception {
+
+        Alert alert = new Alert();
+        alert.setId(transaction.getAccountId());
+
+        collector.collect(alert);
+    }
+}
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+#### FraudDetectionJob.scala
+
+{% highlight scala %}
+package spendreport
+
+import org.apache.flink.streaming.api.scala._
+import org.apache.flink.walkthrough.common.sink.AlertSink
+import org.apache.flink.walkthrough.common.entity.Alert
+import org.apache.flink.walkthrough.common.entity.Transaction
+import org.apache.flink.walkthrough.common.source.TransactionSource
+
+object FraudDetectionJob {
+
+  @throws[Exception]
+  def main(args: Array[String]): Unit = {
+    val env: StreamExecutionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment
+
+    val transactions: DataStream[Transaction] = env
+      .addSource(new TransactionSource)
+      .name("transactions")
+
+    val alerts: DataStream[Alert] = transactions
+      .keyBy(transaction => transaction.getAccountId)
+      .process(new FraudDetector)
+      .name("fraud-detector")
+
+    alerts
+      .addSink(new AlertSink)
+      .name("send-alerts")
+
+    env.execute("Fraud Detection")
+  }
+}
+{% endhighlight %}
+
+#### FraudDetector.scala
+
+{% highlight scala %}
+package spendreport
+
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction
+import org.apache.flink.util.Collector
+import org.apache.flink.walkthrough.common.entity.Alert
+import org.apache.flink.walkthrough.common.entity.Transaction
+
+object FraudDetector {
+  val SMALL_AMOUNT: Double = 1.00
+  val LARGE_AMOUNT: Double = 500.00
+  val ONE_MINUTE: Long     = 60 * 1000L
+}
+
+@SerialVersionUID(1L)
+class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] {
+
+  @throws[Exception]
+  def processElement(
+      transaction: Transaction,
+      context: KeyedProcessFunction[Long, Transaction, Alert]#Context,
+      collector: Collector[Alert]): Unit = {
+
+    val alert = new Alert
+    alert.setId(transaction.getAccountId)
+
+    collector.collect(alert)
+  }
+}
+
+{% endhighlight %}
+</div>
+</div>
+
+## Breaking Down the Code
+
+Let's walk step-by-step through the code of these two files. The 
`FraudDetectionJob` class defines the data flow of the application and the 
`FraudDetector` class defines the business logic of the function that detects 
fraudulent transactions.
+
+We start describing how the Job is assembled in the `main` method of the 
`FraudDetectionJob` class.
+
+#### The Execution Environment
+
+The first line sets up your `StreamExecutionEnvironment`.
+The execution environment is how you set properties for your Job, create your 
sources, and finally trigger the execution of the Job.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env: StreamExecutionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment
+{% endhighlight %}
+</div>
+</div>
+
+#### Creating a Source
+
+Sources ingest data from external systems, such as Apache Kafka, Rabbit MQ, or 
Apache Pulsar, into Flink Jobs.
+This walkthrough uses a source that generates an infinite stream of credit 
card transactions for you to process.
+Each transaction contains an account ID (`accountId`), timestamp (`timestamp`) 
of when the transaction occurred, and US$ amount (`amount`).
+The `name` attached to the source is just for debugging purposes, so if 
something goes wrong, we will know where the error originated.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+DataStream<Transaction> transactions = env
+    .addSource(new TransactionSource())
+    .name("transactions");
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val transactions: DataStream[Transaction] = env
+  .addSource(new TransactionSource)
+  .name("transactions")
+{% endhighlight %}
+</div>
+</div>
+
+
+#### Partitioning Events & Detecting Fraud
+
+The `transactions` stream contains a lot of transactions from a large number 
of users, such that it needs to be processed in parallel my multiple fraud 
detection tasks. Since fraud occurs on a per-account basis, you must ensure 
that all transactions for the same account are processed by the same parallel 
task of the fraud detector operator.
+
+To ensure that the same physical task processes all records for a particular 
key, you can partition a stream using `DataStream#keyBy`. 
+The `process()` call adds an operator that applies a function to each 
partitioned element in the stream.
+It is common to say the operator immediately after a `keyBy`, in this case 
`FraudDetector`, is executed within a _keyed context_.
+
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+DataStream<Alert> alerts = transactions
+    .keyBy(Transaction::getAccountId)
+    .process(new FraudDetector())
+    .name("fraud-detector");
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val alerts: DataStream[Alert] = transactions
+  .keyBy(transaction => transaction.getAccountId)
+  .process(new FraudDetector)
+  .name("fraud-detector")
+{% endhighlight %}
+</div>
+</div>
+
+#### Outputting Results
+ 
+A sink writes a `DataStream` to an external system; such as Apache Kafka, 
Cassandra, and AWS Kinesis.
+The `AlertSink` logs each `Alert` record with log level **INFO**, instead of 
writing it to persistent storage, so you can easily see your results.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+alerts.addSink(new AlertSink());
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+alerts.addSink(new AlertSink)
+{% endhighlight %}
+</div>
+</div>
+
+#### Executing the Job
+
+Flink applications are built lazily and shipped to the cluster for execution 
only once fully formed.
+Call `StreamExecutionEnvironment#execute` to begin the execution of our Job 
and give it a name.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+env.execute("Fraud Detection");
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+env.execute("Fraud Detection")
+{% endhighlight %}
+</div>
+</div>
+
+#### The Fraud Detector
+
+The fraud detector is implemented as a `KeyedProcessFunction`.
+Its method `KeyedProcessFunction#processElement` is called for every 
transaction event.
+This first version produces an alert on every transaction, which some may say 
is overly conservative.
+
+The next steps of this tutorial will guide you to expand the fraud detector 
with more meaningful business logic.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+public class FraudDetector extends KeyedProcessFunction<Long, Transaction, 
Alert> {
+
+    private static final double SMALL_AMOUNT = 1.00;
+    private static final double LARGE_AMOUNT = 500.00;
+    private static final long ONE_MINUTE = 60 * 1000;
+
+    @Override
+    public void processElement(
+            Transaction transaction,
+            Context context,
+            Collector<Alert> collector) throws Exception {
+  
+        Alert alert = new Alert();
+        alert.setId(transaction.getAccountId());
+
+        collector.collect(alert);
+    }
+}
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+object FraudDetector {
+  val SMALL_AMOUNT: Double = 1.00
+  val LARGE_AMOUNT: Double = 500.00
+  val ONE_MINUTE: Long     = 60 * 1000L
+}
+
+@SerialVersionUID(1L)
+class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] {
+
+  @throws[Exception]
+  def processElement(
+      transaction: Transaction,
+      context: KeyedProcessFunction[Long, Transaction, Alert]#Context,
+      collector: Collector[Alert]): Unit = {
+
+    val alert = new Alert
+    alert.setId(transaction.getAccountId)
+
+    collector.collect(alert)
+  }
+}
+{% endhighlight %}
+</div>
+</div>
+
+## Writing a Real Application (v1)
+
+For the first version, the fraud detector should output an alert for any 
account that makes a small transaction immediately followed by a large one. 
Where small is anything less than $1.00 and large is more than $500.
+Imagine your fraud detector processes the following stream of transactions for 
a particular account.
+
+<p class="text-center">
+    <img alt="Transactions" width="80%" src="{{ site.baseurl 
}}/fig/fraud-transactions.svg"/>
+</p>
+
+Transactions 3 and 4 should be marked as fraudulent because it is a small 
transaction, $0.09, followed by a large one, $510.
+Alternatively, transactions 7, 8, and 9 are not fraud because the small amount 
of $0.02 is not immediately followed by the large one; instead, there is an 
intermediate transaction that breaks the pattern.
+
+To do this, the fraud detector must _remember_ information across events; a 
large transaction is only fraudulent if the previous one was small.
+Remembering information across events requires [state]({{ site.baseurl 
}}/concepts/glossary.html#managed-state), and that is why we decided to use a 
[KeyedProcessFunction]({{ site.baseurl 
}}/dev/stream/operators/process_function.html). 
+It provides fine-grained control over both state and time, which will allow us 
to evolve our algorithm with more complex requirements throughout this 
walkthrough.
+
+The most straightforward implementation is a boolean flag that is set whenever 
a small transaction is processed.
+When a large transaction comes through, you can simply check if the flag is 
set for that account.
+
+However, merely implementing the flag as a member variable in the 
`FraudDetector` class will not work. 
+Flink processes the transactions of multiple accounts with the same object 
instance of `FraudDetector`, which means if accounts A and B are routed through 
the same instance of `FraudDetector`, a transaction for account A could set the 
flag to true and then a transaction for account B could set off a false alert. 
+We could of course use a data structure like a `Map` to keep track of the 
flags for individual keys, however, a simple member variable would not be 
fault-tolerant and all its information be lost in case of a failure.
+Hence, the fraud detector would possibly miss alerts if the application ever 
had to restart to recover from a failure.
+
+To address these challenges, Flink provides primitives for fault-tolerant 
state that are almost as easy to use as regular member variables.
+
+The most basic type of state in Flink is [ValueState]({{ site.baseurl 
}}/dev/stream/state/state.html#using-managed-keyed-state), a data type that 
adds fault tolerance to any variable it wraps.
+`ValueState` is a form of _keyed state_, meaning it is only available in 
operators that are applied in a _keyed context_; any operator immediately 
following `DataStream#keyBy`.
+A _keyed state_ of an operator is automatically scoped to the key of the 
record that is currently processed.
+In this example, the key is the account id for the current transaction (as 
declared by `keyBy()`), and `FraudDetector` maintains an independent state for 
each account. 
+`ValueState` is created using a `ValueStateDescriptor` which contains metadata 
about how Flink should manage the variable. The state should be registered 
before the function starts processing data.
+The right hook for this is the `open()` method.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+public class FraudDetector extends KeyedProcessFunction<Long, Transaction, 
Alert> {
+
+    private static final long serialVersionUID = 1L;
+
+    private transient ValueState<Boolean> flagState;
+
+    @Override
+    public void open(Configuration parameters) {
+        ValueStateDescriptor<Boolean> flagDescriptor = new 
ValueStateDescriptor<>(
+                "flag",
+                Types.BOOLEAN);
+        flagState = getRuntimeContext().getState(flagDescriptor);
+    }
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+@SerialVersionUID(1L)
+class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] {
+
+  @transient private var flagState: ValueState[java.lang.Boolean] = _
+
+  @throws[Exception]
+  override def open(parameters: Configuration): Unit = {
+    val flagDescriptor = new ValueStateDescriptor("flag", Types.BOOLEAN)
+    flagState = getRuntimeContext.getState(flagDescriptor)
+  }
+{% endhighlight %}
+</div>
+</div>
+
+`ValueState` is a wrapper class, similar to `AtomicReference` or `AtomicLong` 
in the Java standard library.
+It provides three methods for interacting with its contents; `update` sets the 
state, `value` gets the current value, and `clear` deletes its contents.
+If the state for a particular key is empty, such as at the beginning of an 
application or after calling `ValueState#clear`, then `ValueState#value` will 
return `null`.
+Modifications to the object returned by `ValueState#value` are not guaranteed 
to be recognized by the system, and so all changes must be performed with 
`ValueState#update`.
+Otherwise, fault tolerance is managed automatically by Flink under the hood, 
and so you can interact with it like with any standard variable.
+
+Below, you can see an example of how you can use a flag state to track 
potential fraudulent transactions.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+    @Override
+    public void processElement(
+            Transaction transaction,
+            Context context,
+            Collector<Alert> collector) throws Exception {
+
+        // Get the current state for the current key
+        Boolean lastTransactionWasSmall = flagState.value();
+
+        // Check if the flag is set
+        if (lastTransactionWasSmall != null) {
+            if (transaction.getAmount() > LARGE_AMOUNT) {
+                // Output an alert downstream
+                Alert alert = new Alert();
+                alert.setId(transaction.getAccountId());
+
+                collector.collect(alert);            
+            }
+
+            // Clean up our state
+            flagState.clear();
+        }
+
+        if (transaction.getAmount() < SMALL_AMOUNT) {
+            // Set the flag to true
+            flagState.update(true);
+        }
+    }
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+  override def processElement(
+      transaction: Transaction,
+      context: KeyedProcessFunction[Long, Transaction, Alert]#Context,
+      collector: Collector[Alert]): Unit = {
+
+    // Get the current state for the current key
+    val lastTransactionWasSmall = flagState.value
+
+    // Check if the flag is set
+    if (lastTransactionWasSmall != null) {
+      if (transaction.getAmount > FraudDetector.LARGE_AMOUNT) {
+        // Output an alert downstream
+        val alert = new Alert
+        alert.setId(transaction.getAccountId)
+
+        collector.collect(alert)
+      }
+      // Clean up our state
+      flagState.clear()
+    }
+
+    if (transaction.getAmount < FraudDetector.SMALL_AMOUNT) {
+      // set the flag to true
+      flagState.update(true)
+    }
+  }
+{% endhighlight %}
+</div>
+</div>
+
+For every transaction, the fraud detector checks the state of the flag for 
that account.
+Remember, `ValueState` is always scoped to the current key, i.e., account.
+If the flag is non-null, then the last transaction seen for that account was 
small, and so if the amount for this transaction is large, then the detector 
outputs a fraud alert.
+
+After that check, the flag state is unconditionally cleared.
+Either the current transaction caused a fraud alert, and the pattern is over, 
or the current transaction did not cause an alert, and the pattern is broken 
and needs to be restarted.
+
+Finally, the transaction amount is checked to see if it is small.
+If so, then the flag is set so that it can be checked by the next event.
+Notice that `ValueState<Boolean>` actually has three states, unset ( `null`), 
`true`, and `false`, because all `ValueState`'s are nullable.
+This job only makes use of unset ( `null`) and `true` to check whether the 
flag is set or not.
+
+## Fraud Detector v2: State + Time = &#10084;&#65039;
+
+Scammers don't wait long to make their large purchase to reduce the chances 
their test transaction is noticed. 
+For example, suppose you wanted to set a 1 minute timeout to your fraud 
detector; i.e., in the previous example transactions 3 and 4 would only be 
considered fraud if they occurred within 1 minute of each other.
+Flink's `KeyedProcessFunction` allows you to set timers which invoke a 
callback method at some point in time in the future.
+
+Let's see how we can modify our Job to comply with our new requirements:
+
+* Whenever the flag is set to `true`, also set a timer for 1 minute in the 
future.
+* When the timer fires, reset the flag by clearing its state.
+* If the flag is ever cleared the timer should be canceled.
+
+To cancel a timer, you have to remember what time it is set for, and 
remembering implies state, so you will begin by creating a timer state along 
with your flag state.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+    private transient ValueState<Boolean> flagState;
+    private transient ValueState<Long> timerState;
+
+    @Override
+    public void open(Configuration parameters) {
+        ValueStateDescriptor<Boolean> flagDescriptor = new 
ValueStateDescriptor<>(
+                "flag",
+                Types.BOOLEAN);
+        flagState = getRuntimeContext().getState(flagDescriptor);
+
+        ValueStateDescriptor<Long> timerDescriptor = new 
ValueStateDescriptor<>(
+                "timer-state",
+                Types.LONG);
+        timerState = getRuntimeContext().getState(timerDescriptor);
+    }
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+@SerialVersionUID(1L)
+class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] {
+
+  @transient private var flagState: ValueState[java.lang.Boolean] = _
+  @transient private var timerState: ValueState[java.lang.Long] = _
+
+  @throws[Exception]
+  override def open(parameters: Configuration): Unit = {
+    val flagDescriptor = new ValueStateDescriptor("flag", Types.BOOLEAN)
+    flagState = getRuntimeContext.getState(flagDescriptor)
+
+    val timerDescriptor = new ValueStateDescriptor("timer-state", Types.LONG)
+    timerState = getRuntimeContext.getState(timerDescriptor)
+  }
+{% endhighlight %}
+</div>
+</div>
+
+`KeyedProcessFunction#processElement` is called with a `Context` that contains 
a timer service.
+The timer service can be used to query the current time, register timers, and 
delete timers.
+With this, you can set a timer for 1 minute in the future every time the flag 
is set and store the timestamp in `timerState`.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+if (transaction.getAmount() < SMALL_AMOUNT) {
+    // set the flag to true
+    flagState.update(true);
+
+    // set the timer and timer state
+    long timer = context.timerService().currentProcessingTime() + ONE_MINUTE;
+    context.timerService().registerProcessingTimeTimer(timer);
+    timerState.update(timer);
+}
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+if (transaction.getAmount < FraudDetector.SMALL_AMOUNT) {
+  // set the flag to true
+  flagState.update(true)
+
+  // set the timer and timer state
+  val timer = context.timerService.currentProcessingTime + 
FraudDetector.ONE_MINUTE
+  context.timerService.registerProcessingTimeTimer(timer)
+  timerState.update(timer)
+}
+{% endhighlight %}
+</div>
+</div>
+
+Processing time is wall clock time, and is determined by the system clock of 
the machine running the operator. 
+
+When a timer fires, it calls `KeyedProcessFunction#onTimer`. 
+Overriding this method is how you can implement your callback to reset the 
flag.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+@Override
+public void onTimer(long timestamp, OnTimerContext ctx, Collector<Alert> out) {
+    // remove flag after 1 minute
+    timerState.clear();
+    flagState.clear();
+}
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+override def onTimer(
+    timestamp: Long,
+    ctx: KeyedProcessFunction[Long, Transaction, Alert]#OnTimerContext,
+    out: Collector[Alert]): Unit = {
+  // remove flag after 1 minute
+  timerState.clear()
+  flagState.clear()
+}
+{% endhighlight %}
+</div>
+</div>
+
+Finally, to cancel the timer, you need to delete the registered timer and 
delete the timer state.
+You can wrap this in a helper method and call this method instead of 
`flagState.clear()`.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+private void cleanUp(Context ctx) throws Exception {
+    // delete timer
+    Long timer = timerState.value();
+    ctx.timerService().deleteProcessingTimeTimer(timer);
+
+    // clean up all state
+    timerState.clear();
+    flagState.clear();
+}
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+@throws[Exception]
+private def cleanUp(ctx: KeyedProcessFunction[Long, Transaction, 
Alert]#Context): Unit = {
+  // delete timer
+  val timer = timerState.value
+  ctx.timerService.deleteProcessingTimeTimer(timer)
+
+  // clean up all states
+  timerState.clear()
+  flagState.clear()
+}
+{% endhighlight %}
+</div>
+</div>
+
+And that's it, a fully functional, stateful, distributed streaming application!
+
+## Final Application
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+package spendreport;
+
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.util.Collector;
+import org.apache.flink.walkthrough.common.entity.Alert;
+import org.apache.flink.walkthrough.common.entity.Transaction;
+
+public class FraudDetector extends KeyedProcessFunction<Long, Transaction, 
Alert> {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final double SMALL_AMOUNT = 1.00;
+    private static final double LARGE_AMOUNT = 500.00;
+    private static final long ONE_MINUTE = 60 * 1000;
+
+    private transient ValueState<Boolean> flagState;
+    private transient ValueState<Long> timerState;
+
+    @Override
+    public void open(Configuration parameters) {
+        ValueStateDescriptor<Boolean> flagDescriptor = new 
ValueStateDescriptor<>(
+                "flag",
+                Types.BOOLEAN);
+        flagState = getRuntimeContext().getState(flagDescriptor);
+
+        ValueStateDescriptor<Long> timerDescriptor = new 
ValueStateDescriptor<>(
+                "timer-state",
+                Types.LONG);
+        timerState = getRuntimeContext().getState(timerDescriptor);
+    }
+
+    @Override
+    public void processElement(
+            Transaction transaction,
+            Context context,
+            Collector<Alert> collector) throws Exception {
+
+        // Get the current state for the current key
+        Boolean lastTransactionWasSmall = flagState.value();
+
+        // Check if the flag is set
+        if (lastTransactionWasSmall != null) {
+            if (transaction.getAmount() > LARGE_AMOUNT) {
+                //Output an alert downstream
+                Alert alert = new Alert();
+                alert.setId(transaction.getAccountId());
+
+                collector.collect(alert);
+            }
+            // Clean up our state
+            cleanUp(context);
+        }
+
+        if (transaction.getAmount() < SMALL_AMOUNT) {
+            // set the flag to true
+            flagState.update(true);
+
+            long timer = context.timerService().currentProcessingTime() + 
ONE_MINUTE;
+            context.timerService().registerProcessingTimeTimer(timer);
+
+            timerState.update(timer);
+        }
+    }
+
+    @Override
+    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Alert> 
out) {
+        // remove flag after 1 minute
+        timerState.clear();
+        flagState.clear();
+    }
+
+    private void cleanUp(Context ctx) throws Exception {
+        // delete timer
+        Long timer = timerState.value();
+        ctx.timerService().deleteProcessingTimeTimer(timer);
+
+        // clean up all state
+        timerState.clear();
+        flagState.clear();
+    }
+}
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+package spendreport
+
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.scala.typeutils.Types
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction
+import org.apache.flink.util.Collector
+import org.apache.flink.walkthrough.common.entity.Alert
+import org.apache.flink.walkthrough.common.entity.Transaction
+
+object FraudDetector {
+  val SMALL_AMOUNT: Double = 1.00
+  val LARGE_AMOUNT: Double = 500.00
+  val ONE_MINUTE: Long     = 60 * 1000L
+}
+
+@SerialVersionUID(1L)
+class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] {
+
+  @transient private var flagState: ValueState[java.lang.Boolean] = _
+  @transient private var timerState: ValueState[java.lang.Long] = _
+
+  @throws[Exception]
+  override def open(parameters: Configuration): Unit = {
+    val flagDescriptor = new ValueStateDescriptor("flag", Types.BOOLEAN)
+    flagState = getRuntimeContext.getState(flagDescriptor)
+
+    val timerDescriptor = new ValueStateDescriptor("timer-state", Types.LONG)
+    timerState = getRuntimeContext.getState(timerDescriptor)
+  }
+
+  override def processElement(
+      transaction: Transaction,
+      context: KeyedProcessFunction[Long, Transaction, Alert]#Context,
+      collector: Collector[Alert]): Unit = {
+
+    // Get the current state for the current key
+    val lastTransactionWasSmall = flagState.value
+
+    // Check if the flag is set
+    if (lastTransactionWasSmall != null) {
+      if (transaction.getAmount > FraudDetector.LARGE_AMOUNT) {
+        // Output an alert downstream
+        val alert = new Alert
+        alert.setId(transaction.getAccountId)
+
+        collector.collect(alert)
+      }
+      // Clean up our state
+      cleanUp(context)
+    }
+
+    if (transaction.getAmount < FraudDetector.SMALL_AMOUNT) {
+      // set the flag to true
+      flagState.update(true)
+      val timer = context.timerService.currentProcessingTime + 
FraudDetector.ONE_MINUTE
+
+      context.timerService.registerProcessingTimeTimer(timer)
+      timerState.update(timer)
+    }
+  }
+
+  override def onTimer(
+      timestamp: Long,
+      ctx: KeyedProcessFunction[Long, Transaction, Alert]#OnTimerContext,
+      out: Collector[Alert]): Unit = {
+    // remove flag after 1 minute
+    timerState.clear()
+    flagState.clear()
+  }
+
+  @throws[Exception]
+  private def cleanUp(ctx: KeyedProcessFunction[Long, Transaction, 
Alert]#Context): Unit = {
+    // delete timer
+    val timer = timerState.value
+    ctx.timerService.deleteProcessingTimeTimer(timer)
+
+    // clean up all states
+    timerState.clear()
+    flagState.clear()
+  }
+}
+{% endhighlight %}
+</div>
+</div>
+
+### Expected Output
+
+Running this code with the provided `TransactionSource` will emit fraud alerts 
for account 3.
+You should see the following output in your task manager logs: 
+
+{% highlight bash %}
+2019-08-19 14:22:06,220 INFO  
org.apache.flink.walkthrough.common.sink.AlertSink            - Alert{id=3}
+2019-08-19 14:22:11,383 INFO  
org.apache.flink.walkthrough.common.sink.AlertSink            - Alert{id=3}
+2019-08-19 14:22:16,551 INFO  
org.apache.flink.walkthrough.common.sink.AlertSink            - Alert{id=3}
+2019-08-19 14:22:21,723 INFO  
org.apache.flink.walkthrough.common.sink.AlertSink            - Alert{id=3}
+2019-08-19 14:22:26,896 INFO  
org.apache.flink.walkthrough.common.sink.AlertSink            - Alert{id=3}
+{% endhighlight %}
diff --git a/docs/getting-started/walkthroughs/datastream_api.zh.md 
b/docs/getting-started/walkthroughs/datastream_api.zh.md
new file mode 100644
index 0000000..8676ae4
--- /dev/null
+++ b/docs/getting-started/walkthroughs/datastream_api.zh.md
@@ -0,0 +1,925 @@
+---
+title: "DataStream API"
+nav-id: datastreamwalkthrough
+nav-title: 'DataStream API'
+nav-parent_id: walkthroughs
+nav-pos: 1
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+Apache Flink offers a DataStream API for building robust, stateful streaming 
applications.
+It provides fine-grained control over state and time, which allows for the 
implementation of advanced event-driven systems.
+In this step-by-step guide you'll learn how to build a stateful streaming 
application with Flink's DataStream API.
+
+* This will be replaced by the TOC
+{:toc}
+
+## What Are You Building? 
+
+Credit card fraud is a growing concern in the digital age.
+Criminals steal credit card numbers by running scams or hacking into insecure 
systems.
+Stolen numbers are tested by making one or more small purchases, often for a 
dollar or less.
+If that works, they then make more significant purchases to get items they can 
sell or keep for themselves.
+
+In this tutorial, you will build a fraud detection system for alerting on 
suspicious credit card transactions.
+Using a simple set of rules, you will see how Flink allows us to implement 
advanced business logic and act in real-time.
+
+## Prerequisites
+
+This walkthrough assumes that you have some familiarity with Java or Scala, 
but you should be able to follow along even if you are coming from a different 
programming language.
+
+## Help, I’m Stuck! 
+
+If you get stuck, check out the [community support 
resources](https://flink.apache.org/gettinghelp.html).
+In particular, Apache Flink's [user mailing 
list](https://flink.apache.org/community.html#mailing-lists) is consistently 
ranked as one of the most active of any Apache project and a great way to get 
help quickly.
+
+## How to Follow Along
+
+If you want to follow along, you will require a computer with:
+
+* Java 8 
+* Maven 
+
+A provided Flink Maven Archetype will create a skeleton project with all the 
necessary dependencies quickly, so you only need to focus on filling out the 
business logic.
+These dependencies include `flink-streaming-java` which is the core dependency 
for all Flink streaming applications and `flink-walkthrough-common` that has 
data generators and other classes specific to this walkthrough.
+
+{% panel **Note:** Each code block within this walkthrough may not contain the 
full surrounding class for brevity. The full code is available [at the bottom 
of the page](#final-application). %}
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight bash %}
+$ mvn archetype:generate \
+    -DarchetypeGroupId=org.apache.flink \
+    -DarchetypeArtifactId=flink-walkthrough-datastream-java \{% unless 
site.is_stable %}
+    
-DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/
 \{% endunless %}
+    -DarchetypeVersion={{ site.version }} \
+    -DgroupId=frauddetection \
+    -DartifactId=frauddetection \
+    -Dversion=0.1 \
+    -Dpackage=spendreport \
+    -DinteractiveMode=false
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight bash %}
+$ mvn archetype:generate \
+    -DarchetypeGroupId=org.apache.flink \
+    -DarchetypeArtifactId=flink-walkthrough-datastream-scala \{% unless 
site.is_stable %}
+    
-DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/
 \{% endunless %}
+    -DarchetypeVersion={{ site.version }} \
+    -DgroupId=frauddetection \
+    -DartifactId=frauddetection \
+    -Dversion=0.1 \
+    -Dpackage=spendreport \
+    -DinteractiveMode=false
+{% endhighlight %}
+</div>
+</div>
+
+{% unless site.is_stable %}
+<p style="border-radius: 5px; padding: 5px" class="bg-danger">
+    <b>Note</b>: For Maven 3.0 or higher, it is no longer possible to specify 
the repository (-DarchetypeCatalog) via the commandline. If you wish to use the 
snapshot repository, you need to add a repository entry to your settings.xml. 
For details about this change, please refer to <a 
href="http://maven.apache.org/archetype/maven-archetype-plugin/archetype-repository.html";>Maven
 official document</a>
+</p>
+{% endunless %}
+
+You can edit the `groupId`, `artifactId` and `package` if you like. With the 
above parameters,
+Maven will create a folder named `frauddetection` that contains a project with 
all the dependencies to complete this tutorial.
+After importing the project into your editor, you can find a file 
`FraudDetectionJob.java` (or `FraudDetectionJob.scala`) with the following code 
which you can run directly inside your IDE.
+Try setting break points through out the data stream and run the code in DEBUG 
mode to get a feeling for how everything works.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+#### FraudDetectionJob.java
+
+{% highlight java %}
+package spendreport;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.walkthrough.common.sink.AlertSink;
+import org.apache.flink.walkthrough.common.entity.Alert;
+import org.apache.flink.walkthrough.common.entity.Transaction;
+import org.apache.flink.walkthrough.common.source.TransactionSource;
+
+public class FraudDetectionJob {
+
+    public static void main(String[] args) throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+        DataStream<Transaction> transactions = env
+            .addSource(new TransactionSource())
+            .name("transactions");
+        
+        DataStream<Alert> alerts = transactions
+            .keyBy(Transaction::getAccountId)
+            .process(new FraudDetector())
+            .name("fraud-detector");
+
+        alerts
+            .addSink(new AlertSink())
+            .name("send-alerts");
+
+        env.execute("Fraud Detection");
+    }
+}
+{% endhighlight %}
+
+#### FraudDetector.java
+{% highlight java %}
+package spendreport;
+
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.util.Collector;
+import org.apache.flink.walkthrough.common.entity.Alert;
+import org.apache.flink.walkthrough.common.entity.Transaction;
+
+public class FraudDetector extends KeyedProcessFunction<Long, Transaction, 
Alert> {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final double SMALL_AMOUNT = 1.00;
+    private static final double LARGE_AMOUNT = 500.00;
+    private static final long ONE_MINUTE = 60 * 1000;
+
+    @Override
+    public void processElement(
+            Transaction transaction,
+            Context context,
+            Collector<Alert> collector) throws Exception {
+
+        Alert alert = new Alert();
+        alert.setId(transaction.getAccountId());
+
+        collector.collect(alert);
+    }
+}
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+#### FraudDetectionJob.scala
+
+{% highlight scala %}
+package spendreport
+
+import org.apache.flink.streaming.api.scala._
+import org.apache.flink.walkthrough.common.sink.AlertSink
+import org.apache.flink.walkthrough.common.entity.Alert
+import org.apache.flink.walkthrough.common.entity.Transaction
+import org.apache.flink.walkthrough.common.source.TransactionSource
+
+object FraudDetectionJob {
+
+  @throws[Exception]
+  def main(args: Array[String]): Unit = {
+    val env: StreamExecutionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment
+
+    val transactions: DataStream[Transaction] = env
+      .addSource(new TransactionSource)
+      .name("transactions")
+
+    val alerts: DataStream[Alert] = transactions
+      .keyBy(transaction => transaction.getAccountId)
+      .process(new FraudDetector)
+      .name("fraud-detector")
+
+    alerts
+      .addSink(new AlertSink)
+      .name("send-alerts")
+
+    env.execute("Fraud Detection")
+  }
+}
+{% endhighlight %}
+
+#### FraudDetector.scala
+
+{% highlight scala %}
+package spendreport
+
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction
+import org.apache.flink.util.Collector
+import org.apache.flink.walkthrough.common.entity.Alert
+import org.apache.flink.walkthrough.common.entity.Transaction
+
+object FraudDetector {
+  val SMALL_AMOUNT: Double = 1.00
+  val LARGE_AMOUNT: Double = 500.00
+  val ONE_MINUTE: Long     = 60 * 1000L
+}
+
+@SerialVersionUID(1L)
+class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] {
+
+  @throws[Exception]
+  def processElement(
+      transaction: Transaction,
+      context: KeyedProcessFunction[Long, Transaction, Alert]#Context,
+      collector: Collector[Alert]): Unit = {
+
+    val alert = new Alert
+    alert.setId(transaction.getAccountId)
+
+    collector.collect(alert)
+  }
+}
+
+{% endhighlight %}
+</div>
+</div>
+
+## Breaking Down the Code
+
+Let's walk step-by-step through the code of these two files. The 
`FraudDetectionJob` class defines the data flow of the application and the 
`FraudDetector` class defines the business logic of the function that detects 
fraudulent transactions.
+
+We start describing how the Job is assembled in the `main` method of the 
`FraudDetectionJob` class.
+
+#### The Execution Environment
+
+The first line sets up your `StreamExecutionEnvironment`.
+The execution environment is how you set properties for your Job, create your 
sources, and finally trigger the execution of the Job.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env: StreamExecutionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment
+{% endhighlight %}
+</div>
+</div>
+
+#### Creating a Source
+
+Sources ingest data from external systems, such as Apache Kafka, Rabbit MQ, or 
Apache Pulsar, into Flink Jobs.
+This walkthrough uses a source that generates an infinite stream of credit 
card transactions for you to process.
+Each transaction contains an account ID (`accountId`), timestamp (`timestamp`) 
of when the transaction occurred, and US$ amount (`amount`).
+The `name` attached to the source is just for debugging purposes, so if 
something goes wrong, we will know where the error originated.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+DataStream<Transaction> transactions = env
+    .addSource(new TransactionSource())
+    .name("transactions");
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val transactions: DataStream[Transaction] = env
+  .addSource(new TransactionSource)
+  .name("transactions")
+{% endhighlight %}
+</div>
+</div>
+
+
+#### Partitioning Events & Detecting Fraud
+
+The `transactions` stream contains a lot of transactions from a large number 
of users, such that it needs to be processed in parallel my multiple fraud 
detection tasks. Since fraud occurs on a per-account basis, you must ensure 
that all transactions for the same account are processed by the same parallel 
task of the fraud detector operator.
+
+To ensure that the same physical task processes all records for a particular 
key, you can partition a stream using `DataStream#keyBy`. 
+The `process()` call adds an operator that applies a function to each 
partitioned element in the stream.
+It is common to say the operator immediately after a `keyBy`, in this case 
`FraudDetector`, is executed within a _keyed context_.
+
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+DataStream<Alert> alerts = transactions
+    .keyBy(Transaction::getAccountId)
+    .process(new FraudDetector())
+    .name("fraud-detector");
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val alerts: DataStream[Alert] = transactions
+  .keyBy(transaction => transaction.getAccountId)
+  .process(new FraudDetector)
+  .name("fraud-detector")
+{% endhighlight %}
+</div>
+</div>
+
+#### Outputting Results
+ 
+A sink writes a `DataStream` to an external system; such as Apache Kafka, 
Cassandra, and AWS Kinesis.
+The `AlertSink` logs each `Alert` record with log level **INFO**, instead of 
writing it to persistent storage, so you can easily see your results.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+alerts.addSink(new AlertSink());
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+alerts.addSink(new AlertSink)
+{% endhighlight %}
+</div>
+</div>
+
+#### Executing the Job
+
+Flink applications are built lazily and shipped to the cluster for execution 
only once fully formed.
+Call `StreamExecutionEnvironment#execute` to begin the execution of our Job 
and give it a name.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+env.execute("Fraud Detection");
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+env.execute("Fraud Detection")
+{% endhighlight %}
+</div>
+</div>
+
+#### The Fraud Detector
+
+The fraud detector is implemented as a `KeyedProcessFunction`.
+Its method `KeyedProcessFunction#processElement` is called for every 
transaction event.
+This first version produces an alert on every transaction, which some may say 
is overly conservative.
+
+The next steps of this tutorial will guide you to expand the fraud detector 
with more meaningful business logic.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+public class FraudDetector extends KeyedProcessFunction<Long, Transaction, 
Alert> {
+
+    private static final double SMALL_AMOUNT = 1.00;
+    private static final double LARGE_AMOUNT = 500.00;
+    private static final long ONE_MINUTE = 60 * 1000;
+
+    @Override
+    public void processElement(
+            Transaction transaction,
+            Context context,
+            Collector<Alert> collector) throws Exception {
+  
+        Alert alert = new Alert();
+        alert.setId(transaction.getAccountId());
+
+        collector.collect(alert);
+    }
+}
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+object FraudDetector {
+  val SMALL_AMOUNT: Double = 1.00
+  val LARGE_AMOUNT: Double = 500.00
+  val ONE_MINUTE: Long     = 60 * 1000L
+}
+
+@SerialVersionUID(1L)
+class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] {
+
+  @throws[Exception]
+  def processElement(
+      transaction: Transaction,
+      context: KeyedProcessFunction[Long, Transaction, Alert]#Context,
+      collector: Collector[Alert]): Unit = {
+
+    val alert = new Alert
+    alert.setId(transaction.getAccountId)
+
+    collector.collect(alert)
+  }
+}
+{% endhighlight %}
+</div>
+</div>
+
+## Writing a Real Application (v1)
+
+For the first version, the fraud detector should output an alert for any 
account that makes a small transaction immediately followed by a large one. 
Where small is anything less than $1.00 and large is more than $500.
+Imagine your fraud detector processes the following stream of transactions for 
a particular account.
+
+<p class="text-center">
+    <img alt="Transactions" width="80%" src="{{ site.baseurl 
}}/fig/fraud-transactions.svg"/>
+</p>
+
+Transactions 3 and 4 should be marked as fraudulent because it is a small 
transaction, $0.09, followed by a large one, $510.
+Alternatively, transactions 7, 8, and 9 are not fraud because the small amount 
of $0.02 is not immediately followed by the large one; instead, there is an 
intermediate transaction that breaks the pattern.
+
+To do this, the fraud detector must _remember_ information across events; a 
large transaction is only fraudulent if the previous one was small.
+Remembering information across events requires [state]({{ site.baseurl 
}}/concepts/glossary.html#managed-state), and that is why we decided to use a 
[KeyedProcessFunction]({{ site.baseurl 
}}/dev/stream/operators/process_function.html). 
+It provides fine-grained control over both state and time, which will allow us 
to evolve our algorithm with more complex requirements throughout this 
walkthrough.
+
+The most straightforward implementation is a boolean flag that is set whenever 
a small transaction is processed.
+When a large transaction comes through, you can simply check if the flag is 
set for that account.
+
+However, merely implementing the flag as a member variable in the 
`FraudDetector` class will not work. 
+Flink processes the transactions of multiple accounts with the same object 
instance of `FraudDetector`, which means if accounts A and B are routed through 
the same instance of `FraudDetector`, a transaction for account A could set the 
flag to true and then a transaction for account B could set off a false alert. 
+We could of course use a data structure like a `Map` to keep track of the 
flags for individual keys, however, a simple member variable would not be 
fault-tolerant and all its information be lost in case of a failure.
+Hence, the fraud detector would possibly miss alerts if the application ever 
had to restart to recover from a failure.
+
+To address these challenges, Flink provides primitives for fault-tolerant 
state that are almost as easy to use as regular member variables.
+
+The most basic type of state in Flink is [ValueState]({{ site.baseurl 
}}/dev/stream/state/state.html#using-managed-keyed-state), a data type that 
adds fault tolerance to any variable it wraps.
+`ValueState` is a form of _keyed state_, meaning it is only available in 
operators that are applied in a _keyed context_; any operator immediately 
following `DataStream#keyBy`.
+A _keyed state_ of an operator is automatically scoped to the key of the 
record that is currently processed.
+In this example, the key is the account id for the current transaction (as 
declared by `keyBy()`), and `FraudDetector` maintains an independent state for 
each account. 
+`ValueState` is created using a `ValueStateDescriptor` which contains metadata 
about how Flink should manage the variable. The state should be registered 
before the function starts processing data.
+The right hook for this is the `open()` method.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+public class FraudDetector extends KeyedProcessFunction<Long, Transaction, 
Alert> {
+
+    private static final long serialVersionUID = 1L;
+
+    private transient ValueState<Boolean> flagState;
+
+    @Override
+    public void open(Configuration parameters) {
+        ValueStateDescriptor<Boolean> flagDescriptor = new 
ValueStateDescriptor<>(
+                "flag",
+                Types.BOOLEAN);
+        flagState = getRuntimeContext().getState(flagDescriptor);
+    }
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+@SerialVersionUID(1L)
+class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] {
+
+  @transient private var flagState: ValueState[java.lang.Boolean] = _
+
+  @throws[Exception]
+  override def open(parameters: Configuration): Unit = {
+    val flagDescriptor = new ValueStateDescriptor("flag", Types.BOOLEAN)
+    flagState = getRuntimeContext.getState(flagDescriptor)
+  }
+{% endhighlight %}
+</div>
+</div>
+
+`ValueState` is a wrapper class, similar to `AtomicReference` or `AtomicLong` 
in the Java standard library.
+It provides three methods for interacting with its contents; `update` sets the 
state, `value` gets the current value, and `clear` deletes its contents.
+If the state for a particular key is empty, such as at the beginning of an 
application or after calling `ValueState#clear`, then `ValueState#value` will 
return `null`.
+Modifications to the object returned by `ValueState#value` are not guaranteed 
to be recognized by the system, and so all changes must be performed with 
`ValueState#update`.
+Otherwise, fault tolerance is managed automatically by Flink under the hood, 
and so you can interact with it like with any standard variable.
+
+Below, you can see an example of how you can use a flag state to track 
potential fraudulent transactions.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+    @Override
+    public void processElement(
+            Transaction transaction,
+            Context context,
+            Collector<Alert> collector) throws Exception {
+
+        // Get the current state for the current key
+        Boolean lastTransactionWasSmall = flagState.value();
+
+        // Check if the flag is set
+        if (lastTransactionWasSmall != null) {
+            if (transaction.getAmount() > LARGE_AMOUNT) {
+                // Output an alert downstream
+                Alert alert = new Alert();
+                alert.setId(transaction.getAccountId());
+
+                collector.collect(alert);            
+            }
+
+            // Clean up our state
+            flagState.clear();
+        }
+
+        if (transaction.getAmount() < SMALL_AMOUNT) {
+            // Set the flag to true
+            flagState.update(true);
+        }
+    }
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+  override def processElement(
+      transaction: Transaction,
+      context: KeyedProcessFunction[Long, Transaction, Alert]#Context,
+      collector: Collector[Alert]): Unit = {
+
+    // Get the current state for the current key
+    val lastTransactionWasSmall = flagState.value
+
+    // Check if the flag is set
+    if (lastTransactionWasSmall != null) {
+      if (transaction.getAmount > FraudDetector.LARGE_AMOUNT) {
+        // Output an alert downstream
+        val alert = new Alert
+        alert.setId(transaction.getAccountId)
+
+        collector.collect(alert)
+      }
+      // Clean up our state
+      flagState.clear()
+    }
+
+    if (transaction.getAmount < FraudDetector.SMALL_AMOUNT) {
+      // set the flag to true
+      flagState.update(true)
+    }
+  }
+{% endhighlight %}
+</div>
+</div>
+
+For every transaction, the fraud detector checks the state of the flag for 
that account.
+Remember, `ValueState` is always scoped to the current key, i.e., account.
+If the flag is non-null, then the last transaction seen for that account was 
small, and so if the amount for this transaction is large, then the detector 
outputs a fraud alert.
+
+After that check, the flag state is unconditionally cleared.
+Either the current transaction caused a fraud alert, and the pattern is over, 
or the current transaction did not cause an alert, and the pattern is broken 
and needs to be restarted.
+
+Finally, the transaction amount is checked to see if it is small.
+If so, then the flag is set so that it can be checked by the next event.
+Notice that `ValueState<Boolean>` actually has three states, unset ( `null`), 
`true`, and `false`, because all `ValueState`'s are nullable.
+This job only makes use of unset ( `null`) and `true` to check whether the 
flag is set or not.
+
+## Fraud Detector v2: State + Time = &#10084;&#65039;
+
+Scammers don't wait long to make their large purchase to reduce the chances 
their test transaction is noticed. 
+For example, suppose you wanted to set a 1 minute timeout to your fraud 
detector; i.e., in the previous example transactions 3 and 4 would only be 
considered fraud if they occurred within 1 minute of each other.
+Flink's `KeyedProcessFunction` allows you to set timers which invoke a 
callback method at some point in time in the future.
+
+Let's see how we can modify our Job to comply with our new requirements:
+
+* Whenever the flag is set to `true`, also set a timer for 1 minute in the 
future.
+* When the timer fires, reset the flag by clearing its state.
+* If the flag is ever cleared the timer should be canceled.
+
+To cancel a timer, you have to remember what time it is set for, and 
remembering implies state, so you will begin by creating a timer state along 
with your flag state.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+    private transient ValueState<Boolean> flagState;
+    private transient ValueState<Long> timerState;
+
+    @Override
+    public void open(Configuration parameters) {
+        ValueStateDescriptor<Boolean> flagDescriptor = new 
ValueStateDescriptor<>(
+                "flag",
+                Types.BOOLEAN);
+        flagState = getRuntimeContext().getState(flagDescriptor);
+
+        ValueStateDescriptor<Long> timerDescriptor = new 
ValueStateDescriptor<>(
+                "timer-state",
+                Types.LONG);
+        timerState = getRuntimeContext().getState(timerDescriptor);
+    }
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+@SerialVersionUID(1L)
+class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] {
+
+  @transient private var flagState: ValueState[java.lang.Boolean] = _
+  @transient private var timerState: ValueState[java.lang.Long] = _
+
+  @throws[Exception]
+  override def open(parameters: Configuration): Unit = {
+    val flagDescriptor = new ValueStateDescriptor("flag", Types.BOOLEAN)
+    flagState = getRuntimeContext.getState(flagDescriptor)
+
+    val timerDescriptor = new ValueStateDescriptor("timer-state", Types.LONG)
+    timerState = getRuntimeContext.getState(timerDescriptor)
+  }
+{% endhighlight %}
+</div>
+</div>
+
+`KeyedProcessFunction#processElement` is called with a `Context` that contains 
a timer service.
+The timer service can be used to query the current time, register timers, and 
delete timers.
+With this, you can set a timer for 1 minute in the future every time the flag 
is set and store the timestamp in `timerState`.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+if (transaction.getAmount() < SMALL_AMOUNT) {
+    // set the flag to true
+    flagState.update(true);
+
+    // set the timer and timer state
+    long timer = context.timerService().currentProcessingTime() + ONE_MINUTE;
+    context.timerService().registerProcessingTimeTimer(timer);
+    timerState.update(timer);
+}
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+if (transaction.getAmount < FraudDetector.SMALL_AMOUNT) {
+  // set the flag to true
+  flagState.update(true)
+
+  // set the timer and timer state
+  val timer = context.timerService.currentProcessingTime + 
FraudDetector.ONE_MINUTE
+  context.timerService.registerProcessingTimeTimer(timer)
+  timerState.update(timer)
+}
+{% endhighlight %}
+</div>
+</div>
+
+Processing time is wall clock time, and is determined by the system clock of 
the machine running the operator. 
+
+When a timer fires, it calls `KeyedProcessFunction#onTimer`. 
+Overriding this method is how you can implement your callback to reset the 
flag.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+@Override
+public void onTimer(long timestamp, OnTimerContext ctx, Collector<Alert> out) {
+    // remove flag after 1 minute
+    timerState.clear();
+    flagState.clear();
+}
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+override def onTimer(
+    timestamp: Long,
+    ctx: KeyedProcessFunction[Long, Transaction, Alert]#OnTimerContext,
+    out: Collector[Alert]): Unit = {
+  // remove flag after 1 minute
+  timerState.clear()
+  flagState.clear()
+}
+{% endhighlight %}
+</div>
+</div>
+
+Finally, to cancel the timer, you need to delete the registered timer and 
delete the timer state.
+You can wrap this in a helper method and call this method instead of 
`flagState.clear()`.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+private void cleanUp(Context ctx) throws Exception {
+    // delete timer
+    Long timer = timerState.value();
+    ctx.timerService().deleteProcessingTimeTimer(timer);
+
+    // clean up all state
+    timerState.clear();
+    flagState.clear();
+}
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+@throws[Exception]
+private def cleanUp(ctx: KeyedProcessFunction[Long, Transaction, 
Alert]#Context): Unit = {
+  // delete timer
+  val timer = timerState.value
+  ctx.timerService.deleteProcessingTimeTimer(timer)
+
+  // clean up all states
+  timerState.clear()
+  flagState.clear()
+}
+{% endhighlight %}
+</div>
+</div>
+
+And that's it, a fully functional, stateful, distributed streaming application!
+
+## Final Application
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+package spendreport;
+
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.util.Collector;
+import org.apache.flink.walkthrough.common.entity.Alert;
+import org.apache.flink.walkthrough.common.entity.Transaction;
+
+public class FraudDetector extends KeyedProcessFunction<Long, Transaction, 
Alert> {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final double SMALL_AMOUNT = 1.00;
+    private static final double LARGE_AMOUNT = 500.00;
+    private static final long ONE_MINUTE = 60 * 1000;
+
+    private transient ValueState<Boolean> flagState;
+    private transient ValueState<Long> timerState;
+
+    @Override
+    public void open(Configuration parameters) {
+        ValueStateDescriptor<Boolean> flagDescriptor = new 
ValueStateDescriptor<>(
+                "flag",
+                Types.BOOLEAN);
+        flagState = getRuntimeContext().getState(flagDescriptor);
+
+        ValueStateDescriptor<Long> timerDescriptor = new 
ValueStateDescriptor<>(
+                "timer-state",
+                Types.LONG);
+        timerState = getRuntimeContext().getState(timerDescriptor);
+    }
+
+    @Override
+    public void processElement(
+            Transaction transaction,
+            Context context,
+            Collector<Alert> collector) throws Exception {
+
+        // Get the current state for the current key
+        Boolean lastTransactionWasSmall = flagState.value();
+
+        // Check if the flag is set
+        if (lastTransactionWasSmall != null) {
+            if (transaction.getAmount() > LARGE_AMOUNT) {
+                //Output an alert downstream
+                Alert alert = new Alert();
+                alert.setId(transaction.getAccountId());
+
+                collector.collect(alert);
+            }
+            // Clean up our state
+            cleanUp(context);
+        }
+
+        if (transaction.getAmount() < SMALL_AMOUNT) {
+            // set the flag to true
+            flagState.update(true);
+
+            long timer = context.timerService().currentProcessingTime() + 
ONE_MINUTE;
+            context.timerService().registerProcessingTimeTimer(timer);
+
+            timerState.update(timer);
+        }
+    }
+
+    @Override
+    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Alert> 
out) {
+        // remove flag after 1 minute
+        timerState.clear();
+        flagState.clear();
+    }
+
+    private void cleanUp(Context ctx) throws Exception {
+        // delete timer
+        Long timer = timerState.value();
+        ctx.timerService().deleteProcessingTimeTimer(timer);
+
+        // clean up all state
+        timerState.clear();
+        flagState.clear();
+    }
+}
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+package spendreport
+
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.scala.typeutils.Types
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction
+import org.apache.flink.util.Collector
+import org.apache.flink.walkthrough.common.entity.Alert
+import org.apache.flink.walkthrough.common.entity.Transaction
+
+object FraudDetector {
+  val SMALL_AMOUNT: Double = 1.00
+  val LARGE_AMOUNT: Double = 500.00
+  val ONE_MINUTE: Long     = 60 * 1000L
+}
+
+@SerialVersionUID(1L)
+class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] {
+
+  @transient private var flagState: ValueState[java.lang.Boolean] = _
+  @transient private var timerState: ValueState[java.lang.Long] = _
+
+  @throws[Exception]
+  override def open(parameters: Configuration): Unit = {
+    val flagDescriptor = new ValueStateDescriptor("flag", Types.BOOLEAN)
+    flagState = getRuntimeContext.getState(flagDescriptor)
+
+    val timerDescriptor = new ValueStateDescriptor("timer-state", Types.LONG)
+    timerState = getRuntimeContext.getState(timerDescriptor)
+  }
+
+  override def processElement(
+      transaction: Transaction,
+      context: KeyedProcessFunction[Long, Transaction, Alert]#Context,
+      collector: Collector[Alert]): Unit = {
+
+    // Get the current state for the current key
+    val lastTransactionWasSmall = flagState.value
+
+    // Check if the flag is set
+    if (lastTransactionWasSmall != null) {
+      if (transaction.getAmount > FraudDetector.LARGE_AMOUNT) {
+        // Output an alert downstream
+        val alert = new Alert
+        alert.setId(transaction.getAccountId)
+
+        collector.collect(alert)
+      }
+      // Clean up our state
+      cleanUp(context)
+    }
+
+    if (transaction.getAmount < FraudDetector.SMALL_AMOUNT) {
+      // set the flag to true
+      flagState.update(true)
+      val timer = context.timerService.currentProcessingTime + 
FraudDetector.ONE_MINUTE
+
+      context.timerService.registerProcessingTimeTimer(timer)
+      timerState.update(timer)
+    }
+  }
+
+  override def onTimer(
+      timestamp: Long,
+      ctx: KeyedProcessFunction[Long, Transaction, Alert]#OnTimerContext,
+      out: Collector[Alert]): Unit = {
+    // remove flag after 1 minute
+    timerState.clear()
+    flagState.clear()
+  }
+
+  @throws[Exception]
+  private def cleanUp(ctx: KeyedProcessFunction[Long, Transaction, 
Alert]#Context): Unit = {
+    // delete timer
+    val timer = timerState.value
+    ctx.timerService.deleteProcessingTimeTimer(timer)
+
+    // clean up all states
+    timerState.clear()
+    flagState.clear()
+  }
+}
+{% endhighlight %}
+</div>
+</div>
+
+### Expected Output
+
+Running this code with the provided `TransactionSource` will emit fraud alerts 
for account 3.
+You should see the following output in your task manager logs: 
+
+{% highlight bash %}
+2019-08-19 14:22:06,220 INFO  
org.apache.flink.walkthrough.common.sink.AlertSink            - Alert{id=3}
+2019-08-19 14:22:11,383 INFO  
org.apache.flink.walkthrough.common.sink.AlertSink            - Alert{id=3}
+2019-08-19 14:22:16,551 INFO  
org.apache.flink.walkthrough.common.sink.AlertSink            - Alert{id=3}
+2019-08-19 14:22:21,723 INFO  
org.apache.flink.walkthrough.common.sink.AlertSink            - Alert{id=3}
+2019-08-19 14:22:26,896 INFO  
org.apache.flink.walkthrough.common.sink.AlertSink            - Alert{id=3}
+{% endhighlight %}
diff --git a/docs/getting-started/walkthroughs/table_api.md 
b/docs/getting-started/walkthroughs/table_api.md
index 878fb54..f0cbb62 100644
--- a/docs/getting-started/walkthroughs/table_api.md
+++ b/docs/getting-started/walkthroughs/table_api.md
@@ -3,7 +3,7 @@ title: "Table API"
 nav-id: tableapiwalkthrough
 nav-title: 'Table API'
 nav-parent_id: walkthroughs
-nav-pos: 1
+nav-pos: 2
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one
diff --git a/docs/getting-started/walkthroughs/table_api.zh.md 
b/docs/getting-started/walkthroughs/table_api.zh.md
index 878fb54..f0cbb62 100644
--- a/docs/getting-started/walkthroughs/table_api.zh.md
+++ b/docs/getting-started/walkthroughs/table_api.zh.md
@@ -3,7 +3,7 @@ title: "Table API"
 nav-id: tableapiwalkthrough
 nav-title: 'Table API'
 nav-parent_id: walkthroughs
-nav-pos: 1
+nav-pos: 2
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one
diff --git a/flink-end-to-end-tests/run-nightly-tests.sh 
b/flink-end-to-end-tests/run-nightly-tests.sh
index 3b34b560..9307188 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -140,6 +140,8 @@ run_test "Quickstarts Scala nightly end-to-end test" 
"$END_TO_END_DIR/test-scrip
 
 run_test "Walkthrough Table Java nightly end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_table_walkthroughs.sh java"
 run_test "Walkthrough Table Scala nightly end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_table_walkthroughs.sh scala"
+run_test "Walkthrough DataStream Java nightly end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_datastream_walkthroughs.sh java"
+run_test "Walkthrough DataStream Scala nightly end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_datastream_walkthroughs.sh scala"
 
 run_test "Test PubSub connector with Docker based Google PubSub Emulator" 
"$END_TO_END_DIR/test-scripts/test_streaming_gcp_pubsub.sh"
 
diff --git a/flink-end-to-end-tests/test-scripts/common.sh 
b/flink-end-to-end-tests/test-scripts/common.sh
index 2dc3787..bdecb32 100644
--- a/flink-end-to-end-tests/test-scripts/common.sh
+++ b/flink-end-to-end-tests/test-scripts/common.sh
@@ -745,3 +745,15 @@ function retry_times() {
     return 1
 }
 
+JOB_ID_REGEX_EXTRACTOR=".*JobID ([0-9,a-f]*)"
+
+function extract_job_id_from_job_submission_return() {
+    if [[ $1 =~ $JOB_ID_REGEX_EXTRACTOR ]];
+        then
+            JOB_ID="${BASH_REMATCH[1]}";
+        else
+            JOB_ID=""
+        fi
+    echo "$JOB_ID"
+}
+
diff --git a/flink-end-to-end-tests/test-scripts/test_cli.sh 
b/flink-end-to-end-tests/test-scripts/test_cli.sh
index b9d285b..4b04890 100755
--- a/flink-end-to-end-tests/test-scripts/test_cli.sh
+++ b/flink-end-to-end-tests/test-scripts/test_cli.sh
@@ -29,23 +29,12 @@ $FLINK_DIR/bin/taskmanager.sh start
 $FLINK_DIR/bin/taskmanager.sh start
 
 # CLI regular expressions
-JOB_ID_REGEX_EXTRACTOR=".*JobID ([0-9,a-f]*)"
 JOB_INFO_PACT_DATA_SOURCE_REGEX_EXTRACTOR="\"pact\": \"(Data Source)\""
 JOB_INFO_PACT_DATA_SINK_REGEX_EXTRACTOR="\"pact\": \"(Data Sink)\""
 JOB_LIST_REGEX_EXTRACTOR_BY_STATUS="([0-9,a-f]*) :"
 
 EXIT_CODE=0
 
-function extract_job_id_from_job_submission_return() {
-    if [[ $1 =~ $JOB_ID_REGEX_EXTRACTOR ]];
-        then
-            JOB_ID="${BASH_REMATCH[1]}";
-        else
-            JOB_ID=""
-        fi
-    echo "$JOB_ID"
-}
-
 function extract_valid_pact_from_job_info_return() {
     PACT_MATCH=0
     if [[ $1 =~ $JOB_INFO_PACT_DATA_SOURCE_REGEX_EXTRACTOR ]];
diff --git a/flink-end-to-end-tests/test-scripts/test_table_walkthroughs.sh 
b/flink-end-to-end-tests/test-scripts/test_datastream_walkthroughs.sh
similarity index 75%
copy from flink-end-to-end-tests/test-scripts/test_table_walkthroughs.sh
copy to flink-end-to-end-tests/test-scripts/test_datastream_walkthroughs.sh
index 77afc58..e976927 100755
--- a/flink-end-to-end-tests/test-scripts/test_table_walkthroughs.sh
+++ b/flink-end-to-end-tests/test-scripts/test_datastream_walkthroughs.sh
@@ -19,7 +19,7 @@
 
 # End to end test for quick starts test.
 # Usage:
-# FLINK_DIR=<flink dir> 
flink-end-to-end-tests/test-scripts/test_table_walkthroughs.sh <Type (java or 
scala)>
+# FLINK_DIR=<flink dir> 
flink-end-to-end-tests/test-scripts/test_datastream_walkthroughs.sh <Type (java 
or scala)>
 
 source "$(dirname "$0")"/common.sh
 
@@ -28,12 +28,12 @@ TEST_TYPE=$1
 mkdir -p "${TEST_DATA_DIR}"
 cd "${TEST_DATA_DIR}"
 
-ARTIFACT_ID=flink-walkthrough-table-${TEST_TYPE}
+ARTIFACT_ID=flink-walkthrough-datastream-${TEST_TYPE}
 ARTIFACT_VERSION=0.1
 
 mvn archetype:generate                                          \
     -DarchetypeGroupId=org.apache.flink                         \
-    -DarchetypeArtifactId=flink-walkthrough-table-${TEST_TYPE}  \
+    -DarchetypeArtifactId=flink-walkthrough-datastream-${TEST_TYPE}  \
     -DarchetypeVersion=${FLINK_VERSION}                         \
     -DgroupId=org.apache.flink.walkthrough                      \
     -DartifactId=${ARTIFACT_ID}                                 \
@@ -46,7 +46,8 @@ cd "${ARTIFACT_ID}"
 mvn clean package -nsu > compile-output.txt
 
 if [[ `grep -c "BUILD FAILURE" compile-output.txt` -eq '1' ]]; then
-    echo "Failure: The walk-through did not successfully compile"
+    echo "Failure: The walkthrough did not successfully compile"
+    cat compile-output.txt
     exit 1
 fi
 
@@ -67,8 +68,28 @@ fi
 
 
TEST_PROGRAM_JAR=${TEST_DATA_DIR}/${ARTIFACT_ID}/target/${ARTIFACT_ID}-${ARTIFACT_VERSION}.jar
 
-add_optional_lib "table"
-
 start_cluster
 
-${FLINK_DIR}/bin/flink run "$TEST_PROGRAM_JAR"
+JOB_ID=""
+EXIT_CODE=0
+
+RETURN=`$FLINK_DIR/bin/flink run -d $TEST_PROGRAM_JAR`
+echo "$RETURN"
+JOB_ID=`extract_job_id_from_job_submission_return "$RETURN"`
+EXIT_CODE=$? # expect matching job id extraction
+
+if [ $EXIT_CODE == 0 ]; then
+    RETURN=`$FLINK_DIR/bin/flink list -r`
+    echo "$RETURN"
+    if [[ `grep -c "$JOB_ID" "$RETURN"` -eq '1'  ]]; then # expect match for 
running job
+        echo "[FAIL] Unable to submit walkthrough"
+        EXIT_CODE=1
+    fi
+fi
+
+if [ $EXIT_CODE == 0 ]; then
+    eval "$FLINK_DIR/bin/flink cancel ${JOB_ID}"
+    EXIT_CODE=$?
+fi
+
+exit $EXIT_CODE
diff --git a/flink-end-to-end-tests/test-scripts/test_table_walkthroughs.sh 
b/flink-end-to-end-tests/test-scripts/test_table_walkthroughs.sh
index 77afc58..f53e972 100755
--- a/flink-end-to-end-tests/test-scripts/test_table_walkthroughs.sh
+++ b/flink-end-to-end-tests/test-scripts/test_table_walkthroughs.sh
@@ -47,6 +47,7 @@ mvn clean package -nsu > compile-output.txt
 
 if [[ `grep -c "BUILD FAILURE" compile-output.txt` -eq '1' ]]; then
     echo "Failure: The walk-through did not successfully compile"
+    cat compile-output.txt
     exit 1
 fi
 
diff --git 
a/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/entity/Alert.java
 
b/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/entity/Alert.java
new file mode 100644
index 0000000..9678fdb
--- /dev/null
+++ 
b/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/entity/Alert.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.walkthrough.common.entity;
+
+import java.util.Objects;
+
+/**
+ * A simple alert event.
+ */
+@SuppressWarnings("unused")
+public final class Alert {
+
+       private long id;
+
+       public long getId() {
+               return id;
+       }
+
+       public void setId(long id) {
+               this.id = id;
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               } else if (o == null || getClass() != o.getClass()) {
+                       return false;
+               }
+               Alert alert = (Alert) o;
+               return id == alert.id;
+       }
+
+       @Override
+       public int hashCode() {
+               return Objects.hash(id);
+       }
+
+       @Override
+       public String toString() {
+               return "Alert{" +
+                       "id=" + id +
+                       '}';
+       }
+}
diff --git 
a/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/sink/AlertSink.java
 
b/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/sink/AlertSink.java
new file mode 100644
index 0000000..4332a8e
--- /dev/null
+++ 
b/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/sink/AlertSink.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.walkthrough.common.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.walkthrough.common.entity.Alert;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A sink for outputting alerts.
+ */
+@PublicEvolving
+@SuppressWarnings("unused")
+public class AlertSink implements SinkFunction<Alert> {
+
+       private static final long serialVersionUID = 1L;
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(AlertSink.class);
+
+       @Override
+       public void invoke(Alert value, Context context) {
+               LOG.info(value.toString());
+       }
+}
diff --git a/flink-walkthroughs/flink-walkthrough-datastream-java/pom.xml 
b/flink-walkthroughs/flink-walkthrough-datastream-java/pom.xml
new file mode 100644
index 0000000..8788a0f
--- /dev/null
+++ b/flink-walkthroughs/flink-walkthrough-datastream-java/pom.xml
@@ -0,0 +1,37 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+       <modelVersion>4.0.0</modelVersion>
+
+       <properties>
+               
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+       </properties>
+
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-walkthroughs</artifactId>
+               <version>1.10-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       <artifactId>flink-walkthrough-datastream-java</artifactId>
+       <packaging>maven-archetype</packaging>
+
+</project>
diff --git 
a/flink-walkthroughs/flink-walkthrough-datastream-java/src/main/resources/META-INF/maven/archetype-metadata.xml
 
b/flink-walkthroughs/flink-walkthrough-datastream-java/src/main/resources/META-INF/maven/archetype-metadata.xml
new file mode 100644
index 0000000..ed235f5
--- /dev/null
+++ 
b/flink-walkthroughs/flink-walkthrough-datastream-java/src/main/resources/META-INF/maven/archetype-metadata.xml
@@ -0,0 +1,36 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<archetype-descriptor
+       
xmlns="http://maven.apache.org/plugins/maven-archetype-plugin/archetype-descriptor/1.0.0";
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+       
xsi:schemaLocation="http://maven.apache.org/plugins/maven-archetype-plugin/archetype-descriptor/1.0.0
 http://maven.apache.org/xsd/archetype-descriptor-1.0.0.xsd";
+       name="flink-walkthrough-datastream-java">
+       <fileSets>
+               <fileSet filtered="true" packaged="true" encoding="UTF-8">
+                       <directory>src/main/java</directory>
+                       <includes>
+                               <include>**/*.java</include>
+                       </includes>
+               </fileSet>
+               <fileSet encoding="UTF-8">
+                       <directory>src/main/resources</directory>
+               </fileSet>
+       </fileSets>
+</archetype-descriptor>
diff --git 
a/flink-walkthroughs/flink-walkthrough-datastream-java/src/main/resources/archetype-resources/pom.xml
 
b/flink-walkthroughs/flink-walkthrough-datastream-java/src/main/resources/archetype-resources/pom.xml
new file mode 100644
index 0000000..8229d78
--- /dev/null
+++ 
b/flink-walkthroughs/flink-walkthrough-datastream-java/src/main/resources/archetype-resources/pom.xml
@@ -0,0 +1,225 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+       <modelVersion>4.0.0</modelVersion>
+
+       <groupId>${groupId}</groupId>
+       <artifactId>${artifactId}</artifactId>
+       <version>${version}</version>
+       <packaging>jar</packaging>
+
+       <name>Flink Walkthrough DataStream Java</name>
+       <url>https://flink.apache.org</url>
+
+       <properties>
+               
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+               <flink.version>@project.version@</flink.version>
+               <java.version>1.8</java.version>
+               <scala.binary.version>2.11</scala.binary.version>
+               <maven.compiler.source>${java.version}</maven.compiler.source>
+               <maven.compiler.target>${java.version}</maven.compiler.target>
+       </properties>
+
+       <repositories>
+               <repository>
+                       <id>apache.snapshots</id>
+                       <name>Apache Development Snapshot Repository</name>
+                       
<url>https://repository.apache.org/content/repositories/snapshots/</url>
+                       <releases>
+                               <enabled>false</enabled>
+                       </releases>
+                       <snapshots>
+                               <enabled>true</enabled>
+                       </snapshots>
+               </repository>
+       </repositories>
+
+       <dependencies>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-walkthrough-common_${scala.binary.version}</artifactId>
+                       <version>${flink.version}</version>
+               </dependency>
+
+               <!-- This dependency is provided, because it should not be 
packaged into the JAR file. -->
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+                       <version>${flink.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+
+               <!-- Add connector dependencies here. They must be in the 
default scope (compile). -->
+
+               <!-- Example:
+
+               <dependency>
+                   <groupId>org.apache.flink</groupId>
+                   
<artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
+                   <version>${flink.version}</version>
+               </dependency>
+               -->
+
+               <!-- Add logging framework, to produce console output when 
running in the IDE. -->
+               <!-- These dependencies are excluded from the application JAR 
by default. -->
+               <dependency>
+                       <groupId>org.slf4j</groupId>
+                       <artifactId>slf4j-log4j12</artifactId>
+                       <version>1.7.7</version>
+                       <scope>runtime</scope>
+               </dependency>
+               <dependency>
+                       <groupId>log4j</groupId>
+                       <artifactId>log4j</artifactId>
+                       <version>1.2.17</version>
+                       <scope>runtime</scope>
+               </dependency>
+       </dependencies>
+
+       <build>
+               <plugins>
+
+                       <!-- Java Compiler -->
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-compiler-plugin</artifactId>
+                               <version>3.1</version>
+                               <configuration>
+                                       <source>${java.version}</source>
+                                       <target>${java.version}</target>
+                               </configuration>
+                       </plugin>
+
+                       <!-- We use the maven-shade plugin to create a fat jar 
that contains all necessary dependencies. -->
+                       <!-- Change the value of <mainClass>...</mainClass> if 
your program entry point changes. -->
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-shade-plugin</artifactId>
+                               <version>3.0.0</version>
+                               <executions>
+                                       <!-- Run shade goal on package phase -->
+                                       <execution>
+                                               <phase>package</phase>
+                                               <goals>
+                                                       <goal>shade</goal>
+                                               </goals>
+                                               <configuration>
+                                                       <artifactSet>
+                                                               <excludes>
+                                                                       
<exclude>org.apache.flink:force-shading</exclude>
+                                                                       
<exclude>com.google.code.findbugs:jsr305</exclude>
+                                                                       
<exclude>org.slf4j:*</exclude>
+                                                                       
<exclude>log4j:*</exclude>
+                                                               </excludes>
+                                                       </artifactSet>
+                                                       <filters>
+                                                               <filter>
+                                                                       <!-- Do 
not copy the signatures in the META-INF folder.
+                                    Otherwise, this might cause 
SecurityExceptions when using the JAR. -->
+                                                                       
<artifact>*:*</artifact>
+                                                                       
<excludes>
+                                                                               
<exclude>META-INF/*.SF</exclude>
+                                                                               
<exclude>META-INF/*.DSA</exclude>
+                                                                               
<exclude>META-INF/*.RSA</exclude>
+                                                                       
</excludes>
+                                                               </filter>
+                                                       </filters>
+                                                       <transformers>
+                                                               <transformer 
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                                                                       
<mainClass>${package}.FraudDetectionJob</mainClass>
+                                                               </transformer>
+                                                       </transformers>
+                                               </configuration>
+                                       </execution>
+                               </executions>
+                       </plugin>
+               </plugins>
+
+               <pluginManagement>
+                       <plugins>
+
+                               <!-- This improves the out-of-the-box 
experience in Eclipse by resolving some warnings. -->
+                               <plugin>
+                                       <groupId>org.eclipse.m2e</groupId>
+                                       
<artifactId>lifecycle-mapping</artifactId>
+                                       <version>1.0.0</version>
+                                       <configuration>
+                                               <lifecycleMappingMetadata>
+                                                       <pluginExecutions>
+                                                               
<pluginExecution>
+                                                                       
<pluginExecutionFilter>
+                                                                               
<groupId>org.apache.maven.plugins</groupId>
+                                                                               
<artifactId>maven-shade-plugin</artifactId>
+                                                                               
<versionRange>[3.0.0,)</versionRange>
+                                                                               
<goals>
+                                                                               
        <goal>shade</goal>
+                                                                               
</goals>
+                                                                       
</pluginExecutionFilter>
+                                                                       <action>
+                                                                               
<ignore/>
+                                                                       
</action>
+                                                               
</pluginExecution>
+                                                               
<pluginExecution>
+                                                                       
<pluginExecutionFilter>
+                                                                               
<groupId>org.apache.maven.plugins</groupId>
+                                                                               
<artifactId>maven-compiler-plugin</artifactId>
+                                                                               
<versionRange>[3.1,)</versionRange>
+                                                                               
<goals>
+                                                                               
        <goal>testCompile</goal>
+                                                                               
        <goal>compile</goal>
+                                                                               
</goals>
+                                                                       
</pluginExecutionFilter>
+                                                                       <action>
+                                                                               
<ignore/>
+                                                                       
</action>
+                                                               
</pluginExecution>
+                                                       </pluginExecutions>
+                                               </lifecycleMappingMetadata>
+                                       </configuration>
+                               </plugin>
+                       </plugins>
+               </pluginManagement>
+       </build>
+
+       <!-- This profile helps to make things run out of the box in IntelliJ 
-->
+       <!-- Its adds Flink's core classes to the runtime class path. -->
+       <!-- Otherwise they are missing in IntelliJ, because the dependency is 
'provided' -->
+       <profiles>
+               <profile>
+                       <id>add-dependencies-for-IDEA</id>
+
+                       <activation>
+                               <property>
+                                       <name>idea.version</name>
+                               </property>
+                       </activation>
+
+                       <dependencies>
+                               <dependency>
+                                       <groupId>org.apache.flink</groupId>
+                                       
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+                                       <version>${flink.version}</version>
+                                       <scope>compile</scope>
+                               </dependency>
+                       </dependencies>
+               </profile>
+       </profiles>
+
+</project>
diff --git 
a/flink-walkthroughs/flink-walkthrough-datastream-java/src/main/resources/archetype-resources/src/main/java/FraudDetectionJob.java
 
b/flink-walkthroughs/flink-walkthrough-datastream-java/src/main/resources/archetype-resources/src/main/java/FraudDetectionJob.java
new file mode 100644
index 0000000..46019fd
--- /dev/null
+++ 
b/flink-walkthroughs/flink-walkthrough-datastream-java/src/main/resources/archetype-resources/src/main/java/FraudDetectionJob.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ${package};
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.walkthrough.common.sink.AlertSink;
+import org.apache.flink.walkthrough.common.entity.Alert;
+import org.apache.flink.walkthrough.common.entity.Transaction;
+import org.apache.flink.walkthrough.common.source.TransactionSource;
+
+/**
+ * Skeleton code for the datastream walkthrough
+ */
+public class FraudDetectionJob {
+       public static void main(String[] args) throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+               DataStream<Transaction> transactions = env
+                       .addSource(new TransactionSource())
+                       .name("transactions");
+
+               DataStream<Alert> alerts = transactions
+                       .keyBy(Transaction::getAccountId)
+                       .process(new FraudDetector())
+                       .name("fraud-detector");
+
+               alerts
+                       .addSink(new AlertSink())
+                       .name("send-alerts");
+
+               env.execute("Fraud Detection");
+       }
+}
diff --git 
a/flink-walkthroughs/flink-walkthrough-datastream-java/src/main/resources/archetype-resources/src/main/java/FraudDetector.java
 
b/flink-walkthroughs/flink-walkthrough-datastream-java/src/main/resources/archetype-resources/src/main/java/FraudDetector.java
new file mode 100644
index 0000000..e5c034c
--- /dev/null
+++ 
b/flink-walkthroughs/flink-walkthrough-datastream-java/src/main/resources/archetype-resources/src/main/java/FraudDetector.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ${package};
+
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.util.Collector;
+import org.apache.flink.walkthrough.common.entity.Alert;
+import org.apache.flink.walkthrough.common.entity.Transaction;
+
+/**
+ * Skeleton code for implementing a fraud detector.
+ */
+public class FraudDetector extends KeyedProcessFunction<Long, Transaction, 
Alert> {
+
+       private static final long serialVersionUID = 1L;
+
+       private static final double SMALL_AMOUNT = 1.00;
+       private static final double LARGE_AMOUNT = 500.00;
+       private static final long ONE_MINUTE = 60 * 1000;
+
+       @Override
+       public void processElement(
+                       Transaction transaction,
+                       Context context,
+                       Collector<Alert> collector) throws Exception {
+
+               Alert alert = new Alert();
+               alert.setId(transaction.getAccountId());
+
+               collector.collect(alert);
+       }
+}
diff --git 
a/flink-walkthroughs/flink-walkthrough-datastream-java/src/main/resources/archetype-resources/src/main/resources/log4j.properties
 
b/flink-walkthroughs/flink-walkthrough-datastream-java/src/main/resources/archetype-resources/src/main/resources/log4j.properties
new file mode 100644
index 0000000..8be9b9a
--- /dev/null
+++ 
b/flink-walkthroughs/flink-walkthrough-datastream-java/src/main/resources/archetype-resources/src/main/resources/log4j.properties
@@ -0,0 +1,24 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=Warn, console
+log4j.logger.org.apache.flink.walkthrough.common.sink.AlertSink=info
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x 
- %m%n
diff --git a/flink-walkthroughs/flink-walkthrough-datastream-scala/pom.xml 
b/flink-walkthroughs/flink-walkthrough-datastream-scala/pom.xml
new file mode 100644
index 0000000..3e35e27
--- /dev/null
+++ b/flink-walkthroughs/flink-walkthrough-datastream-scala/pom.xml
@@ -0,0 +1,37 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+       <modelVersion>4.0.0</modelVersion>
+
+       <properties>
+               
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+       </properties>
+
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-walkthroughs</artifactId>
+               <version>1.10-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       <artifactId>flink-walkthrough-datastream-scala</artifactId>
+       <packaging>maven-archetype</packaging>
+
+</project>
diff --git 
a/flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/META-INF/maven/archetype-metadata.xml
 
b/flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/META-INF/maven/archetype-metadata.xml
new file mode 100644
index 0000000..e78dc8d
--- /dev/null
+++ 
b/flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/META-INF/maven/archetype-metadata.xml
@@ -0,0 +1,36 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<archetype-descriptor
+       
xmlns="http://maven.apache.org/plugins/maven-archetype-plugin/archetype-descriptor/1.0.0";
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+       
xsi:schemaLocation="http://maven.apache.org/plugins/maven-archetype-plugin/archetype-descriptor/1.0.0
 http://maven.apache.org/xsd/archetype-descriptor-1.0.0.xsd";
+       name="flink-walkthrough-datastream-scala">
+       <fileSets>
+               <fileSet filtered="true" packaged="true" encoding="UTF-8">
+                       <directory>src/main/scala</directory>
+                       <includes>
+                               <include>**/*.scala</include>
+                       </includes>
+               </fileSet>
+               <fileSet encoding="UTF-8">
+                       <directory>src/main/resources</directory>
+               </fileSet>
+       </fileSets>
+</archetype-descriptor>
diff --git 
a/flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/archetype-resources/pom.xml
 
b/flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/archetype-resources/pom.xml
new file mode 100644
index 0000000..93956fa
--- /dev/null
+++ 
b/flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/archetype-resources/pom.xml
@@ -0,0 +1,256 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+                xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+       <modelVersion>4.0.0</modelVersion>
+
+       <groupId>${groupId}</groupId>
+       <artifactId>${artifactId}</artifactId>
+       <version>${version}</version>
+       <packaging>jar</packaging>
+
+       <name>Flink Walkthrough DataStram Scala</name>
+       <url>https://flink.apache.org</url>
+
+       <repositories>
+               <repository>
+                       <id>apache.snapshots</id>
+                       <name>Apache Development Snapshot Repository</name>
+                       
<url>https://repository.apache.org/content/repositories/snapshots/</url>
+                       <releases>
+                               <enabled>false</enabled>
+                       </releases>
+                       <snapshots>
+                               <enabled>true</enabled>
+                       </snapshots>
+               </repository>
+       </repositories>
+
+       <properties>
+               
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+               <flink.version>@project.version@</flink.version>
+               <scala.binary.version>2.11</scala.binary.version>
+               <scala.version>2.11.12</scala.version>
+       </properties>
+
+       <dependencies>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-walkthrough-common_${scala.binary.version}</artifactId>
+                       <version>${flink.version}</version>
+               </dependency>
+
+               <!-- Apache Flink dependencies -->
+               <!-- These dependencies are provided, because they should not 
be packaged into the JAR file. -->
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
+                       <version>${flink.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+
+               <!-- Add connector dependencies here. They must be in the 
default scope (compile). -->
+
+               <!-- Example:
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
+                       <version>${flink.version}</version>
+               </dependency>
+               -->
+
+               <!-- Add logging framework, to produce console output when 
running in the IDE. -->
+               <!-- These dependencies are excluded from the application JAR 
by default. -->
+               <dependency>
+                       <groupId>org.slf4j</groupId>
+                       <artifactId>slf4j-log4j12</artifactId>
+                       <version>1.7.7</version>
+                       <scope>runtime</scope>
+               </dependency>
+               <dependency>
+                       <groupId>log4j</groupId>
+                       <artifactId>log4j</artifactId>
+                       <version>1.2.17</version>
+                       <scope>runtime</scope>
+               </dependency>
+       </dependencies>
+
+       <build>
+               <plugins>
+                       <!-- We use the maven-shade plugin to create a fat jar 
that contains all necessary dependencies. -->
+                       <!-- Change the value of <mainClass>...</mainClass> if 
your program entry point changes. -->
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-shade-plugin</artifactId>
+                               <version>3.0.0</version>
+                               <executions>
+                                       <!-- Run shade goal on package phase -->
+                                       <execution>
+                                               <phase>package</phase>
+                                               <goals>
+                                                       <goal>shade</goal>
+                                               </goals>
+                                               <configuration>
+                                                       <artifactSet>
+                                                               <excludes>
+                                                                       
<exclude>org.apache.flink:force-shading</exclude>
+                                                                       
<exclude>com.google.code.findbugs:jsr305</exclude>
+                                                                       
<exclude>org.slf4j:*</exclude>
+                                                                       
<exclude>log4j:*</exclude>
+                                                               </excludes>
+                                                       </artifactSet>
+                                                       <filters>
+                                                               <filter>
+                                                                       <!-- Do 
not copy the signatures in the META-INF folder.
+                                                                       
Otherwise, this might cause SecurityExceptions when using the JAR. -->
+                                                                       
<artifact>*:*</artifact>
+                                                                       
<excludes>
+                                                                               
<exclude>META-INF/*.SF</exclude>
+                                                                               
<exclude>META-INF/*.DSA</exclude>
+                                                                               
<exclude>META-INF/*.RSA</exclude>
+                                                                       
</excludes>
+                                                               </filter>
+                                                       </filters>
+                                                       <transformers>
+                                                               <transformer 
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                                                                       
<mainClass>${package}.FraudDetectionJob</mainClass>
+                                                               </transformer>
+                                                       </transformers>
+                                               </configuration>
+                                       </execution>
+                               </executions>
+                       </plugin>
+
+                       <!-- Java Compiler -->
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-compiler-plugin</artifactId>
+                               <version>3.1</version>
+                               <configuration>
+                                       <source>1.8</source>
+                                       <target>1.8</target>
+                               </configuration>
+                       </plugin>
+
+                       <!-- Scala Compiler -->
+                       <plugin>
+                               <groupId>net.alchim31.maven</groupId>
+                               <artifactId>scala-maven-plugin</artifactId>
+                               <version>3.2.2</version>
+                               <executions>
+                                       <execution>
+                                               <goals>
+                                                       <goal>compile</goal>
+                                                       <goal>testCompile</goal>
+                                               </goals>
+                                       </execution>
+                               </executions>
+                       </plugin>
+
+                       <!-- Eclipse Scala Integration -->
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-eclipse-plugin</artifactId>
+                               <version>2.8</version>
+                               <configuration>
+                                       <downloadSources>true</downloadSources>
+                                       <projectnatures>
+                                               
<projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
+                                               
<projectnature>org.eclipse.jdt.core.javanature</projectnature>
+                                       </projectnatures>
+                                       <buildcommands>
+                                               
<buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
+                                       </buildcommands>
+                                       <classpathContainers>
+                                               
<classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
+                                               
<classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
+                                       </classpathContainers>
+                                       <excludes>
+                                               
<exclude>org.scala-lang:scala-library</exclude>
+                                               
<exclude>org.scala-lang:scala-compiler</exclude>
+                                       </excludes>
+                                       <sourceIncludes>
+                                               
<sourceInclude>**/*.scala</sourceInclude>
+                                               
<sourceInclude>**/*.java</sourceInclude>
+                                       </sourceIncludes>
+                               </configuration>
+                       </plugin>
+                       <plugin>
+                               <groupId>org.codehaus.mojo</groupId>
+                               
<artifactId>build-helper-maven-plugin</artifactId>
+                               <version>1.7</version>
+                               <executions>
+                                       <!-- Add src/main/scala to eclipse 
build path -->
+                                       <execution>
+                                               <id>add-source</id>
+                                               <phase>generate-sources</phase>
+                                               <goals>
+                                                       <goal>add-source</goal>
+                                               </goals>
+                                               <configuration>
+                                                       <sources>
+                                                               
<source>src/main/scala</source>
+                                                       </sources>
+                                               </configuration>
+                                       </execution>
+                                       <!-- Add src/test/scala to eclipse 
build path -->
+                                       <execution>
+                                               <id>add-test-source</id>
+                                               
<phase>generate-test-sources</phase>
+                                               <goals>
+                                                       
<goal>add-test-source</goal>
+                                               </goals>
+                                               <configuration>
+                                                       <sources>
+                                                               
<source>src/test/scala</source>
+                                                       </sources>
+                                               </configuration>
+                                       </execution>
+                               </executions>
+                       </plugin>
+               </plugins>
+       </build>
+
+       <!-- This profile helps to make things run out of the box in IntelliJ 
-->
+       <!-- Its adds Flink's core classes to the runtime class path. -->
+       <!-- Otherwise they are missing in IntelliJ, because the dependency is 
'provided' -->
+       <profiles>
+               <profile>
+                       <id>add-dependencies-for-IDEA</id>
+
+                       <activation>
+                               <property>
+                                       <name>idea.version</name>
+                               </property>
+                       </activation>
+
+                       <dependencies>
+                               <dependency>
+                                       <groupId>org.apache.flink</groupId>
+                                       
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
+                                       <version>${flink.version}</version>
+                                       <scope>compile</scope>
+                               </dependency>
+                       </dependencies>
+               </profile>
+       </profiles>
+
+</project>
diff --git 
a/flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/archetype-resources/src/main/resources/log4j.properties
 
b/flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/archetype-resources/src/main/resources/log4j.properties
new file mode 100644
index 0000000..8be9b9a
--- /dev/null
+++ 
b/flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/archetype-resources/src/main/resources/log4j.properties
@@ -0,0 +1,24 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=Warn, console
+log4j.logger.org.apache.flink.walkthrough.common.sink.AlertSink=info
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x 
- %m%n
diff --git 
a/flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/archetype-resources/src/main/scala/FraudDetectionJob.scala
 
b/flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/archetype-resources/src/main/scala/FraudDetectionJob.scala
new file mode 100644
index 0000000..58e46e2
--- /dev/null
+++ 
b/flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/archetype-resources/src/main/scala/FraudDetectionJob.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ${package}
+
+import org.apache.flink.streaming.api.scala._
+import org.apache.flink.walkthrough.common.sink.AlertSink
+import org.apache.flink.walkthrough.common.entity.Alert
+import org.apache.flink.walkthrough.common.entity.Transaction
+import org.apache.flink.walkthrough.common.source.TransactionSource
+
+/**
+  * Skeleton code for the DataStream code walkthrough
+  */
+object FraudDetectionJob {
+
+  @throws[Exception]
+  def main(args: Array[String]): Unit = {
+    val env: StreamExecutionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment
+
+    val transactions: DataStream[Transaction] = env
+      .addSource(new TransactionSource)
+      .name("transactions")
+
+    val alerts: DataStream[Alert] = transactions
+      .keyBy(transaction => transaction.getAccountId)
+      .process(new FraudDetector)
+      .name("fraud-detector")
+
+    alerts
+      .addSink(new AlertSink)
+      .name("send-alerts")
+
+    env.execute("Fraud Detection")
+  }
+}
diff --git 
a/flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/archetype-resources/src/main/scala/FraudDetector.scala
 
b/flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/archetype-resources/src/main/scala/FraudDetector.scala
new file mode 100644
index 0000000..6d7d91d
--- /dev/null
+++ 
b/flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/archetype-resources/src/main/scala/FraudDetector.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ${package}
+
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction
+import org.apache.flink.util.Collector
+import org.apache.flink.walkthrough.common.entity.Alert
+import org.apache.flink.walkthrough.common.entity.Transaction
+
+/**
+  * Skeleton code for implementing a fraud detector.
+  */
+object FraudDetector {
+  val SMALL_AMOUNT: Double = 1.00
+  val LARGE_AMOUNT: Double = 500.00
+  val ONE_MINUTE: Long     = 60 * 1000L
+}
+
+@SerialVersionUID(1L)
+class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] {
+
+  @throws[Exception]
+  def processElement(
+      transaction: Transaction,
+      context: KeyedProcessFunction[Long, Transaction, Alert]#Context,
+      collector: Collector[Alert]): Unit = {
+
+    val alert = new Alert
+    alert.setId(transaction.getAccountId)
+
+    collector.collect(alert)
+  }
+}
diff --git a/flink-walkthroughs/pom.xml b/flink-walkthroughs/pom.xml
index 2733f59..ce499e7 100644
--- a/flink-walkthroughs/pom.xml
+++ b/flink-walkthroughs/pom.xml
@@ -36,6 +36,8 @@ under the License.
                <module>flink-walkthrough-common</module>
                <module>flink-walkthrough-table-java</module>
                <module>flink-walkthrough-table-scala</module>
+               <module>flink-walkthrough-datastream-java</module>
+               <module>flink-walkthrough-datastream-scala</module>
        </modules>
        <build>
                <extensions>

Reply via email to