http://git-wip-us.apache.org/repos/asf/flink/blob/31b86f60/docs/dev/stream/operators/index.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/operators/index.md 
b/docs/dev/stream/operators/index.md
new file mode 100644
index 0000000..0ed0b2a
--- /dev/null
+++ b/docs/dev/stream/operators/index.md
@@ -0,0 +1,1169 @@
+---
+title: "Operators"
+nav-id: streaming_operators
+nav-show_overview: true
+nav-parent_id: streaming
+nav-pos: 9
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+Operators transform one or more DataStreams into a new DataStream. Programs 
can combine
+multiple transformations into sophisticated dataflow topologies.
+
+This section gives a description of the basic transformations, the effective 
physical
+partitioning after applying those as well as insights into Flink's operator 
chaining.
+
+* toc
+{:toc}
+
+# DataStream Transformations
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+<br />
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 25%">Transformation</th>
+      <th class="text-center">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+          <td><strong>Map</strong><br>DataStream &rarr; DataStream</td>
+          <td>
+            <p>Takes one element and produces one element. A map function that 
doubles the values of the input stream:</p>
+    {% highlight java %}
+DataStream<Integer> dataStream = //...
+dataStream.map(new MapFunction<Integer, Integer>() {
+    @Override
+    public Integer map(Integer value) throws Exception {
+        return 2 * value;
+    }
+});
+    {% endhighlight %}
+          </td>
+        </tr>
+
+        <tr>
+          <td><strong>FlatMap</strong><br>DataStream &rarr; DataStream</td>
+          <td>
+            <p>Takes one element and produces zero, one, or more elements. A 
flatmap function that splits sentences to words:</p>
+    {% highlight java %}
+dataStream.flatMap(new FlatMapFunction<String, String>() {
+    @Override
+    public void flatMap(String value, Collector<String> out)
+        throws Exception {
+        for(String word: value.split(" ")){
+            out.collect(word);
+        }
+    }
+});
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Filter</strong><br>DataStream &rarr; DataStream</td>
+          <td>
+            <p>Evaluates a boolean function for each element and retains those 
for which the function returns true.
+            A filter that filters out zero values:
+            </p>
+    {% highlight java %}
+dataStream.filter(new FilterFunction<Integer>() {
+    @Override
+    public boolean filter(Integer value) throws Exception {
+        return value != 0;
+    }
+});
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>KeyBy</strong><br>DataStream &rarr; KeyedStream</td>
+          <td>
+            <p>Logically partitions a stream into disjoint partitions, each 
partition containing elements of the same key.
+            Internally, this is implemented with hash partitioning. See <a 
href="{{ site.baseurl }}/dev/api_concepts.html#specifying-keys">keys</a> on how 
to specify keys.
+            This transformation returns a KeyedStream.</p>
+    {% highlight java %}
+dataStream.keyBy("someKey") // Key by field "someKey"
+dataStream.keyBy(0) // Key by the first element of a Tuple
+    {% endhighlight %}
+            <p>
+            <span class="label label-danger">Attention</span>
+            A type <strong>cannot be a key</strong> if:
+           <ol>
+           <li> it is a POJO type but does not override the 
<em>hashCode()</em> method and
+           relies on the <em>Object.hashCode()</em> implementation.</li>
+           <li> it is an array of any type.</li>
+           </ol>
+           </p>
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Reduce</strong><br>KeyedStream &rarr; DataStream</td>
+          <td>
+            <p>A "rolling" reduce on a keyed data stream. Combines the current 
element with the last reduced value and
+            emits the new value.
+                    <br/>
+               <br/>
+            A reduce function that creates a stream of partial sums:</p>
+            {% highlight java %}
+keyedStream.reduce(new ReduceFunction<Integer>() {
+    @Override
+    public Integer reduce(Integer value1, Integer value2)
+    throws Exception {
+        return value1 + value2;
+    }
+});
+            {% endhighlight %}
+            </p>
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Fold</strong><br>KeyedStream &rarr; DataStream</td>
+          <td>
+          <p>A "rolling" fold on a keyed data stream with an initial value.
+          Combines the current element with the last folded value and
+          emits the new value.
+          <br/>
+          <br/>
+          <p>A fold function that, when applied on the sequence (1,2,3,4,5),
+          emits the sequence "start-1", "start-1-2", "start-1-2-3", ...</p>
+          {% highlight java %}
+DataStream<String> result =
+  keyedStream.fold("start", new FoldFunction<Integer, String>() {
+    @Override
+    public String fold(String current, Integer value) {
+        return current + "-" + value;
+    }
+  });
+          {% endhighlight %}
+          </p>
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Aggregations</strong><br>KeyedStream &rarr; 
DataStream</td>
+          <td>
+            <p>Rolling aggregations on a keyed data stream. The difference 
between min
+           and minBy is that min returns the minimum value, whereas minBy 
returns
+           the element that has the minimum value in this field (same for max 
and maxBy).</p>
+    {% highlight java %}
+keyedStream.sum(0);
+keyedStream.sum("key");
+keyedStream.min(0);
+keyedStream.min("key");
+keyedStream.max(0);
+keyedStream.max("key");
+keyedStream.minBy(0);
+keyedStream.minBy("key");
+keyedStream.maxBy(0);
+keyedStream.maxBy("key");
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Window</strong><br>KeyedStream &rarr; WindowedStream</td>
+          <td>
+            <p>Windows can be defined on already partitioned KeyedStreams. 
Windows group the data in each
+            key according to some characteristic (e.g., the data that arrived 
within the last 5 seconds).
+            See <a href="windows.html">windows</a> for a complete description 
of windows.
+    {% highlight java %}
+dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))); // 
Last 5 seconds of data
+    {% endhighlight %}
+        </p>
+          </td>
+        </tr>
+        <tr>
+          <td><strong>WindowAll</strong><br>DataStream &rarr; 
AllWindowedStream</td>
+          <td>
+              <p>Windows can be defined on regular DataStreams. Windows group 
all the stream events
+              according to some characteristic (e.g., the data that arrived 
within the last 5 seconds).
+              See <a href="windows.html">windows</a> for a complete 
description of windows.</p>
+              <p><strong>WARNING:</strong> This is in many cases a 
<strong>non-parallel</strong> transformation. All records will be
+               gathered in one task for the windowAll operator.</p>
+  {% highlight java %}
+dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 
seconds of data
+  {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Window Apply</strong><br>WindowedStream &rarr; 
DataStream<br>AllWindowedStream &rarr; DataStream</td>
+          <td>
+            <p>Applies a general function to the window as a whole. Below is a 
function that manually sums the elements of a window.</p>
+            <p><strong>Note:</strong> If you are using a windowAll 
transformation, you need to use an AllWindowFunction instead.</p>
+    {% highlight java %}
+windowedStream.apply (new WindowFunction<Tuple2<String,Integer>, Integer, 
Tuple, Window>() {
+    public void apply (Tuple tuple,
+            Window window,
+            Iterable<Tuple2<String, Integer>> values,
+            Collector<Integer> out) throws Exception {
+        int sum = 0;
+        for (value t: values) {
+            sum += t.f1;
+        }
+        out.collect (new Integer(sum));
+    }
+});
+
+// applying an AllWindowFunction on non-keyed window stream
+allWindowedStream.apply (new AllWindowFunction<Tuple2<String,Integer>, 
Integer, Window>() {
+    public void apply (Window window,
+            Iterable<Tuple2<String, Integer>> values,
+            Collector<Integer> out) throws Exception {
+        int sum = 0;
+        for (value t: values) {
+            sum += t.f1;
+        }
+        out.collect (new Integer(sum));
+    }
+});
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Window Reduce</strong><br>WindowedStream &rarr; 
DataStream</td>
+          <td>
+            <p>Applies a functional reduce function to the window and returns 
the reduced value.</p>
+    {% highlight java %}
+windowedStream.reduce (new ReduceFunction<Tuple2<String,Integer>>() {
+    public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, 
Tuple2<String, Integer> value2) throws Exception {
+        return new Tuple2<String,Integer>(value1.f0, value1.f1 + value2.f1);
+    }
+});
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Window Fold</strong><br>WindowedStream &rarr; 
DataStream</td>
+          <td>
+            <p>Applies a functional fold function to the window and returns 
the folded value.
+               The example function, when applied on the sequence (1,2,3,4,5),
+               folds the sequence into the string "start-1-2-3-4-5":</p>
+    {% highlight java %}
+windowedStream.fold("start", new FoldFunction<Integer, String>() {
+    public String fold(String current, Integer value) {
+        return current + "-" + value;
+    }
+});
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Aggregations on windows</strong><br>WindowedStream 
&rarr; DataStream</td>
+          <td>
+            <p>Aggregates the contents of a window. The difference between min
+           and minBy is that min returns the minimun value, whereas minBy 
returns
+           the element that has the minimum value in this field (same for max 
and maxBy).</p>
+    {% highlight java %}
+windowedStream.sum(0);
+windowedStream.sum("key");
+windowedStream.min(0);
+windowedStream.min("key");
+windowedStream.max(0);
+windowedStream.max("key");
+windowedStream.minBy(0);
+windowedStream.minBy("key");
+windowedStream.maxBy(0);
+windowedStream.maxBy("key");
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Union</strong><br>DataStream* &rarr; DataStream</td>
+          <td>
+            <p>Union of two or more data streams creating a new stream 
containing all the elements from all the streams. Note: If you union a data 
stream
+            with itself you will get each element twice in the resulting 
stream.</p>
+    {% highlight java %}
+dataStream.union(otherStream1, otherStream2, ...);
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Window Join</strong><br>DataStream,DataStream &rarr; 
DataStream</td>
+          <td>
+            <p>Join two data streams on a given key and a common window.</p>
+    {% highlight java %}
+dataStream.join(otherStream)
+    .where(<key selector>).equalTo(<key selector>)
+    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
+    .apply (new JoinFunction () {...});
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Window CoGroup</strong><br>DataStream,DataStream &rarr; 
DataStream</td>
+          <td>
+            <p>Cogroups two data streams on a given key and a common 
window.</p>
+    {% highlight java %}
+dataStream.coGroup(otherStream)
+    .where(0).equalTo(1)
+    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
+    .apply (new CoGroupFunction () {...});
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Connect</strong><br>DataStream,DataStream &rarr; 
ConnectedStreams</td>
+          <td>
+            <p>"Connects" two data streams retaining their types. Connect 
allowing for shared state between
+            the two streams.</p>
+    {% highlight java %}
+DataStream<Integer> someStream = //...
+DataStream<String> otherStream = //...
+
+ConnectedStreams<Integer, String> connectedStreams = 
someStream.connect(otherStream);
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>CoMap, CoFlatMap</strong><br>ConnectedStreams &rarr; 
DataStream</td>
+          <td>
+            <p>Similar to map and flatMap on a connected data stream</p>
+    {% highlight java %}
+connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() {
+    @Override
+    public Boolean map1(Integer value) {
+        return true;
+    }
+
+    @Override
+    public Boolean map2(String value) {
+        return false;
+    }
+});
+connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() {
+
+   @Override
+   public void flatMap1(Integer value, Collector<String> out) {
+       out.collect(value.toString());
+   }
+
+   @Override
+   public void flatMap2(String value, Collector<String> out) {
+       for (String word: value.split(" ")) {
+         out.collect(word);
+       }
+   }
+});
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Split</strong><br>DataStream &rarr; SplitStream</td>
+          <td>
+            <p>
+                Split the stream into two or more streams according to some 
criterion.
+                {% highlight java %}
+SplitStream<Integer> split = someDataStream.split(new 
OutputSelector<Integer>() {
+    @Override
+    public Iterable<String> select(Integer value) {
+        List<String> output = new ArrayList<String>();
+        if (value % 2 == 0) {
+            output.add("even");
+        }
+        else {
+            output.add("odd");
+        }
+        return output;
+    }
+});
+                {% endhighlight %}
+            </p>
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Select</strong><br>SplitStream &rarr; DataStream</td>
+          <td>
+            <p>
+                Select one or more streams from a split stream.
+                {% highlight java %}
+SplitStream<Integer> split;
+DataStream<Integer> even = split.select("even");
+DataStream<Integer> odd = split.select("odd");
+DataStream<Integer> all = split.select("even","odd");
+                {% endhighlight %}
+            </p>
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Iterate</strong><br>DataStream &rarr; IterativeStream 
&rarr; DataStream</td>
+          <td>
+            <p>
+                Creates a "feedback" loop in the flow, by redirecting the 
output of one operator
+                to some previous operator. This is especially useful for 
defining algorithms that
+                continuously update a model. The following code starts with a 
stream and applies
+               the iteration body continuously. Elements that are greater than 
0 are sent back
+               to the feedback channel, and the rest of the elements are 
forwarded downstream.
+               See <a href="#iterations">iterations</a> for a complete 
description.
+                {% highlight java %}
+IterativeStream<Long> iteration = initialStream.iterate();
+DataStream<Long> iterationBody = iteration.map (/*do something*/);
+DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>(){
+    @Override
+    public boolean filter(Integer value) throws Exception {
+        return value > 0;
+    }
+});
+iteration.closeWith(feedback);
+DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>(){
+    @Override
+    public boolean filter(Integer value) throws Exception {
+        return value <= 0;
+    }
+});
+                {% endhighlight %}
+            </p>
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Extract Timestamps</strong><br>DataStream &rarr; 
DataStream</td>
+          <td>
+            <p>
+                Extracts timestamps from records in order to work with windows
+                that use event time semantics. See <a href="{{ site.baseurl 
}}/dev/event_time.html">Event Time</a>.
+                {% highlight java %}
+stream.assignTimestamps (new TimeStampExtractor() {...});
+                {% endhighlight %}
+            </p>
+          </td>
+        </tr>
+  </tbody>
+</table>
+
+</div>
+
+<div data-lang="scala" markdown="1">
+
+<br />
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 25%">Transformation</th>
+      <th class="text-center">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+          <td><strong>Map</strong><br>DataStream &rarr; DataStream</td>
+          <td>
+            <p>Takes one element and produces one element. A map function that 
doubles the values of the input stream:</p>
+    {% highlight scala %}
+dataStream.map { x => x * 2 }
+    {% endhighlight %}
+          </td>
+        </tr>
+
+        <tr>
+          <td><strong>FlatMap</strong><br>DataStream &rarr; DataStream</td>
+          <td>
+            <p>Takes one element and produces zero, one, or more elements. A 
flatmap function that splits sentences to words:</p>
+    {% highlight scala %}
+dataStream.flatMap { str => str.split(" ") }
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Filter</strong><br>DataStream &rarr; DataStream</td>
+          <td>
+            <p>Evaluates a boolean function for each element and retains those 
for which the function returns true.
+            A filter that filters out zero values:
+            </p>
+    {% highlight scala %}
+dataStream.filter { _ != 0 }
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>KeyBy</strong><br>DataStream &rarr; KeyedStream</td>
+          <td>
+            <p>Logically partitions a stream into disjoint partitions, each 
partition containing elements of the same key.
+            Internally, this is implemented with hash partitioning. See <a 
href="{{ site.baseurl }}/dev/api_concepts.html#specifying-keys">keys</a> on how 
to specify keys.
+            This transformation returns a KeyedStream.</p>
+    {% highlight scala %}
+dataStream.keyBy("someKey") // Key by field "someKey"
+dataStream.keyBy(0) // Key by the first element of a Tuple
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Reduce</strong><br>KeyedStream &rarr; DataStream</td>
+          <td>
+            <p>A "rolling" reduce on a keyed data stream. Combines the current 
element with the last reduced value and
+            emits the new value.
+                    <br/>
+               <br/>
+            A reduce function that creates a stream of partial sums:</p>
+            {% highlight scala %}
+keyedStream.reduce { _ + _ }
+            {% endhighlight %}
+            </p>
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Fold</strong><br>KeyedStream &rarr; DataStream</td>
+          <td>
+          <p>A "rolling" fold on a keyed data stream with an initial value.
+          Combines the current element with the last folded value and
+          emits the new value.
+          <br/>
+          <br/>
+          <p>A fold function that, when applied on the sequence (1,2,3,4,5),
+          emits the sequence "start-1", "start-1-2", "start-1-2-3", ...</p>
+          {% highlight scala %}
+val result: DataStream[String] =
+    keyedStream.fold("start")((str, i) => { str + "-" + i })
+          {% endhighlight %}
+          </p>
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Aggregations</strong><br>KeyedStream &rarr; 
DataStream</td>
+          <td>
+            <p>Rolling aggregations on a keyed data stream. The difference 
between min
+           and minBy is that min returns the minimun value, whereas minBy 
returns
+           the element that has the minimum value in this field (same for max 
and maxBy).</p>
+    {% highlight scala %}
+keyedStream.sum(0)
+keyedStream.sum("key")
+keyedStream.min(0)
+keyedStream.min("key")
+keyedStream.max(0)
+keyedStream.max("key")
+keyedStream.minBy(0)
+keyedStream.minBy("key")
+keyedStream.maxBy(0)
+keyedStream.maxBy("key")
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Window</strong><br>KeyedStream &rarr; WindowedStream</td>
+          <td>
+            <p>Windows can be defined on already partitioned KeyedStreams. 
Windows group the data in each
+            key according to some characteristic (e.g., the data that arrived 
within the last 5 seconds).
+            See <a href="windows.html">windows</a> for a description of 
windows.
+    {% highlight scala %}
+dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))) // 
Last 5 seconds of data
+    {% endhighlight %}
+        </p>
+          </td>
+        </tr>
+        <tr>
+          <td><strong>WindowAll</strong><br>DataStream &rarr; 
AllWindowedStream</td>
+          <td>
+              <p>Windows can be defined on regular DataStreams. Windows group 
all the stream events
+              according to some characteristic (e.g., the data that arrived 
within the last 5 seconds).
+              See <a href="windows.html">windows</a> for a complete 
description of windows.</p>
+              <p><strong>WARNING:</strong> This is in many cases a 
<strong>non-parallel</strong> transformation. All records will be
+               gathered in one task for the windowAll operator.</p>
+  {% highlight scala %}
+dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))) // Last 5 
seconds of data
+  {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Window Apply</strong><br>WindowedStream &rarr; 
DataStream<br>AllWindowedStream &rarr; DataStream</td>
+          <td>
+            <p>Applies a general function to the window as a whole. Below is a 
function that manually sums the elements of a window.</p>
+            <p><strong>Note:</strong> If you are using a windowAll 
transformation, you need to use an AllWindowFunction instead.</p>
+    {% highlight scala %}
+windowedStream.apply { WindowFunction }
+
+// applying an AllWindowFunction on non-keyed window stream
+allWindowedStream.apply { AllWindowFunction }
+
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Window Reduce</strong><br>WindowedStream &rarr; 
DataStream</td>
+          <td>
+            <p>Applies a functional reduce function to the window and returns 
the reduced value.</p>
+    {% highlight scala %}
+windowedStream.reduce { _ + _ }
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Window Fold</strong><br>WindowedStream &rarr; 
DataStream</td>
+          <td>
+            <p>Applies a functional fold function to the window and returns 
the folded value.
+               The example function, when applied on the sequence (1,2,3,4,5),
+               folds the sequence into the string "start-1-2-3-4-5":</p>
+          {% highlight scala %}
+val result: DataStream[String] =
+    windowedStream.fold("start", (str, i) => { str + "-" + i })
+          {% endhighlight %}
+          </td>
+       </tr>
+        <tr>
+          <td><strong>Aggregations on windows</strong><br>WindowedStream 
&rarr; DataStream</td>
+          <td>
+            <p>Aggregates the contents of a window. The difference between min
+           and minBy is that min returns the minimum value, whereas minBy 
returns
+           the element that has the minimum value in this field (same for max 
and maxBy).</p>
+    {% highlight scala %}
+windowedStream.sum(0)
+windowedStream.sum("key")
+windowedStream.min(0)
+windowedStream.min("key")
+windowedStream.max(0)
+windowedStream.max("key")
+windowedStream.minBy(0)
+windowedStream.minBy("key")
+windowedStream.maxBy(0)
+windowedStream.maxBy("key")
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Union</strong><br>DataStream* &rarr; DataStream</td>
+          <td>
+            <p>Union of two or more data streams creating a new stream 
containing all the elements from all the streams. Note: If you union a data 
stream
+            with itself you will get each element twice in the resulting 
stream.</p>
+    {% highlight scala %}
+dataStream.union(otherStream1, otherStream2, ...)
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Window Join</strong><br>DataStream,DataStream &rarr; 
DataStream</td>
+          <td>
+            <p>Join two data streams on a given key and a common window.</p>
+    {% highlight scala %}
+dataStream.join(otherStream)
+    .where(<key selector>).equalTo(<key selector>)
+    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
+    .apply { ... }
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Window CoGroup</strong><br>DataStream,DataStream &rarr; 
DataStream</td>
+          <td>
+            <p>Cogroups two data streams on a given key and a common 
window.</p>
+    {% highlight scala %}
+dataStream.coGroup(otherStream)
+    .where(0).equalTo(1)
+    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
+    .apply {}
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Connect</strong><br>DataStream,DataStream &rarr; 
ConnectedStreams</td>
+          <td>
+            <p>"Connects" two data streams retaining their types, allowing for 
shared state between
+            the two streams.</p>
+    {% highlight scala %}
+someStream : DataStream[Int] = ...
+otherStream : DataStream[String] = ...
+
+val connectedStreams = someStream.connect(otherStream)
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>CoMap, CoFlatMap</strong><br>ConnectedStreams &rarr; 
DataStream</td>
+          <td>
+            <p>Similar to map and flatMap on a connected data stream</p>
+    {% highlight scala %}
+connectedStreams.map(
+    (_ : Int) => true,
+    (_ : String) => false
+)
+connectedStreams.flatMap(
+    (_ : Int) => true,
+    (_ : String) => false
+)
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Split</strong><br>DataStream &rarr; SplitStream</td>
+          <td>
+            <p>
+                Split the stream into two or more streams according to some 
criterion.
+                {% highlight scala %}
+val split = someDataStream.split(
+  (num: Int) =>
+    (num % 2) match {
+      case 0 => List("even")
+      case 1 => List("odd")
+    }
+)
+                {% endhighlight %}
+            </p>
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Select</strong><br>SplitStream &rarr; DataStream</td>
+          <td>
+            <p>
+                Select one or more streams from a split stream.
+                {% highlight scala %}
+
+val even = split select "even"
+val odd = split select "odd"
+val all = split.select("even","odd")
+                {% endhighlight %}
+            </p>
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Iterate</strong><br>DataStream &rarr; IterativeStream  
&rarr; DataStream</td>
+          <td>
+            <p>
+                Creates a "feedback" loop in the flow, by redirecting the 
output of one operator
+                to some previous operator. This is especially useful for 
defining algorithms that
+                continuously update a model. The following code starts with a 
stream and applies
+               the iteration body continuously. Elements that are greater than 
0 are sent back
+               to the feedback channel, and the rest of the elements are 
forwarded downstream.
+               See <a href="#iterations">iterations</a> for a complete 
description.
+                {% highlight java %}
+initialStream.iterate {
+  iteration => {
+    val iterationBody = iteration.map {/*do something*/}
+    (iterationBody.filter(_ > 0), iterationBody.filter(_ <= 0))
+  }
+}
+                {% endhighlight %}
+            </p>
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Extract Timestamps</strong><br>DataStream &rarr; 
DataStream</td>
+          <td>
+            <p>
+                Extracts timestamps from records in order to work with windows
+                that use event time semantics.
+                See <a href="{{ site.baseurl }}/dev/event_time.html">Event 
Time</a>.
+                {% highlight scala %}
+stream.assignTimestamps { timestampExtractor }
+                {% endhighlight %}
+            </p>
+          </td>
+        </tr>
+  </tbody>
+</table>
+
+Extraction from tuples, case classes and collections via anonymous pattern 
matching, like the following:
+{% highlight scala %}
+val data: DataStream[(Int, String, Double)] = // [...]
+data.map {
+  case (id, name, temperature) => // [...]
+}
+{% endhighlight %}
+is not supported by the API out-of-the-box. To use this feature, you should 
use a <a href="{{ site.baseurl }}/dev/scala_api_extensions.html">Scala API 
extension</a>.
+
+
+</div>
+</div>
+
+The following transformations are available on data streams of Tuples:
+
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+<br />
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 20%">Transformation</th>
+      <th class="text-center">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+   <tr>
+      <td><strong>Project</strong><br>DataStream &rarr; DataStream</td>
+      <td>
+        <p>Selects a subset of fields from the tuples
+{% highlight java %}
+DataStream<Tuple3<Integer, Double, String>> in = // [...]
+DataStream<Tuple2<String, Integer>> out = in.project(2,0);
+{% endhighlight %}
+        </p>
+      </td>
+    </tr>
+  </tbody>
+</table>
+
+</div>
+</div>
+
+
+# Physical partitioning
+
+Flink also gives low-level control (if desired) on the exact stream 
partitioning after a transformation,
+via the following functions.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+<br />
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 20%">Transformation</th>
+      <th class="text-center">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+   <tr>
+      <td><strong>Custom partitioning</strong><br>DataStream &rarr; 
DataStream</td>
+      <td>
+        <p>
+            Uses a user-defined Partitioner to select the target task for each 
element.
+            {% highlight java %}
+dataStream.partitionCustom(partitioner, "someKey");
+dataStream.partitionCustom(partitioner, 0);
+            {% endhighlight %}
+        </p>
+      </td>
+    </tr>
+   <tr>
+     <td><strong>Random partitioning</strong><br>DataStream &rarr; 
DataStream</td>
+     <td>
+       <p>
+            Partitions elements randomly according to a uniform distribution.
+            {% highlight java %}
+dataStream.shuffle();
+            {% endhighlight %}
+       </p>
+     </td>
+   </tr>
+   <tr>
+      <td><strong>Rebalancing (Round-robin 
partitioning)</strong><br>DataStream &rarr; DataStream</td>
+      <td>
+        <p>
+            Partitions elements round-robin, creating equal load per 
partition. Useful for performance
+            optimization in the presence of data skew.
+            {% highlight java %}
+dataStream.rebalance();
+            {% endhighlight %}
+        </p>
+      </td>
+    </tr>
+    <tr>
+      <td><strong>Rescaling</strong><br>DataStream &rarr; DataStream</td>
+      <td>
+        <p>
+            Partitions elements, round-robin, to a subset of downstream 
operations. This is
+            useful if you want to have pipelines where you, for example, fan 
out from
+            each parallel instance of a source to a subset of several mappers 
to distribute load
+            but don't want the full rebalance that rebalance() would incur. 
This would require only
+            local data transfers instead of transferring data over network, 
depending on
+            other configuration values such as the number of slots of 
TaskManagers.
+        </p>
+        <p>
+            The subset of downstream operations to which the upstream 
operation sends
+            elements depends on the degree of parallelism of both the upstream 
and downstream operation.
+            For example, if the upstream operation has parallelism 2 and the 
downstream operation
+            has parallelism 6, then one upstream operation would distribute 
elements to three
+            downstream operations while the other upstream operation would 
distribute to the other
+            three downstream operations. If, on the other hand, the downstream 
operation has parallelism
+            2 while the upstream operation has parallelism 6 then three 
upstream operations would
+            distribute to one downstream operation while the other three 
upstream operations would
+            distribute to the other downstream operation.
+        </p>
+        <p>
+            In cases where the different parallelisms are not multiples of 
each other one or several
+            downstream operations will have a differing number of inputs from 
upstream operations.
+        </p>
+        <p>
+            Please see this figure for a visualization of the connection 
pattern in the above
+            example:
+        </p>
+
+        <div style="text-align: center">
+            <img src="{{ site.baseurl }}/fig/rescale.svg" alt="Checkpoint 
barriers in data streams" />
+            </div>
+
+
+        <p>
+                    {% highlight java %}
+dataStream.rescale();
+            {% endhighlight %}
+
+        </p>
+      </td>
+    </tr>
+   <tr>
+      <td><strong>Broadcasting</strong><br>DataStream &rarr; DataStream</td>
+      <td>
+        <p>
+            Broadcasts elements to every partition.
+            {% highlight java %}
+dataStream.broadcast();
+            {% endhighlight %}
+        </p>
+      </td>
+    </tr>
+  </tbody>
+</table>
+
+</div>
+
+<div data-lang="scala" markdown="1">
+
+<br />
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 20%">Transformation</th>
+      <th class="text-center">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+   <tr>
+      <td><strong>Custom partitioning</strong><br>DataStream &rarr; 
DataStream</td>
+      <td>
+        <p>
+            Uses a user-defined Partitioner to select the target task for each 
element.
+            {% highlight scala %}
+dataStream.partitionCustom(partitioner, "someKey")
+dataStream.partitionCustom(partitioner, 0)
+            {% endhighlight %}
+        </p>
+      </td>
+    </tr>
+   <tr>
+     <td><strong>Random partitioning</strong><br>DataStream &rarr; 
DataStream</td>
+     <td>
+       <p>
+            Partitions elements randomly according to a uniform distribution.
+            {% highlight scala %}
+dataStream.shuffle()
+            {% endhighlight %}
+       </p>
+     </td>
+   </tr>
+   <tr>
+      <td><strong>Rebalancing (Round-robin 
partitioning)</strong><br>DataStream &rarr; DataStream</td>
+      <td>
+        <p>
+            Partitions elements round-robin, creating equal load per 
partition. Useful for performance
+            optimization in the presence of data skew.
+            {% highlight scala %}
+dataStream.rebalance()
+            {% endhighlight %}
+        </p>
+      </td>
+    </tr>
+    <tr>
+      <td><strong>Rescaling</strong><br>DataStream &rarr; DataStream</td>
+      <td>
+        <p>
+            Partitions elements, round-robin, to a subset of downstream 
operations. This is
+            useful if you want to have pipelines where you, for example, fan 
out from
+            each parallel instance of a source to a subset of several mappers 
to distribute load
+            but don't want the full rebalance that rebalance() would incur. 
This would require only
+            local data transfers instead of transferring data over network, 
depending on
+            other configuration values such as the number of slots of 
TaskManagers.
+        </p>
+        <p>
+            The subset of downstream operations to which the upstream 
operation sends
+            elements depends on the degree of parallelism of both the upstream 
and downstream operation.
+            For example, if the upstream operation has parallelism 2 and the 
downstream operation
+            has parallelism 4, then one upstream operation would distribute 
elements to two
+            downstream operations while the other upstream operation would 
distribute to the other
+            two downstream operations. If, on the other hand, the downstream 
operation has parallelism
+            2 while the upstream operation has parallelism 4 then two upstream 
operations would
+            distribute to one downstream operation while the other two 
upstream operations would
+            distribute to the other downstream operations.
+        </p>
+        <p>
+            In cases where the different parallelisms are not multiples of 
each other one or several
+            downstream operations will have a differing number of inputs from 
upstream operations.
+
+        </p>
+        </p>
+            Please see this figure for a visualization of the connection 
pattern in the above
+            example:
+        </p>
+
+        <div style="text-align: center">
+            <img src="{{ site.baseurl }}/fig/rescale.svg" alt="Checkpoint 
barriers in data streams" />
+            </div>
+
+
+        <p>
+                    {% highlight java %}
+dataStream.rescale()
+            {% endhighlight %}
+
+        </p>
+      </td>
+    </tr>
+   <tr>
+      <td><strong>Broadcasting</strong><br>DataStream &rarr; DataStream</td>
+      <td>
+        <p>
+            Broadcasts elements to every partition.
+            {% highlight scala %}
+dataStream.broadcast()
+            {% endhighlight %}
+        </p>
+      </td>
+    </tr>
+  </tbody>
+</table>
+
+</div>
+</div>
+
+# Task chaining and resource groups
+
+Chaining two subsequent transformations means co-locating them within the same 
thread for better
+performance. Flink by default chains operators if this is possible (e.g., two 
subsequent map
+transformations). The API gives fine-grained control over chaining if desired:
+
+Use `StreamExecutionEnvironment.disableOperatorChaining()` if you want to 
disable chaining in
+the whole job. For more fine grained control, the following functions are 
available. Note that
+these functions can only be used right after a DataStream transformation as 
they refer to the
+previous transformation. For example, you can use 
`someStream.map(...).startNewChain()`, but
+you cannot use `someStream.startNewChain()`.
+
+A resource group is a slot in Flink, see
+[slots]({{site.baseurl}}/ops/config.html#configuring-taskmanager-processing-slots).
 You can
+manually isolate operators in separate slots if desired.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+<br />
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 20%">Transformation</th>
+      <th class="text-center">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+   <tr>
+      <td>Start new chain</td>
+      <td>
+        <p>Begin a new chain, starting with this operator. The two
+       mappers will be chained, and filter will not be chained to
+       the first mapper.
+{% highlight java %}
+someStream.filter(...).map(...).startNewChain().map(...);
+{% endhighlight %}
+        </p>
+      </td>
+    </tr>
+   <tr>
+      <td>Disable chaining</td>
+      <td>
+        <p>Do not chain the map operator
+{% highlight java %}
+someStream.map(...).disableChaining();
+{% endhighlight %}
+        </p>
+      </td>
+    </tr>
+    <tr>
+      <td>Set slot sharing group</td>
+      <td>
+        <p>Set the slot sharing group of an operation. Flink will put 
operations with the same
+        slot sharing group into the same slot while keeping operations that 
don't have the
+        slot sharing group in other slots. This can be used to isolate slots. 
The slot sharing
+        group is inherited from input operations if all input operations are 
in the same slot
+        sharing group.
+        The name of the default slot sharing group is "default", operations 
can explicitly
+        be put into this group by calling slotSharingGroup("default").
+{% highlight java %}
+someStream.filter(...).slotSharingGroup("name");
+{% endhighlight %}
+        </p>
+      </td>
+    </tr>
+  </tbody>
+</table>
+
+</div>
+
+<div data-lang="scala" markdown="1">
+
+<br />
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 20%">Transformation</th>
+      <th class="text-center">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+   <tr>
+      <td>Start new chain</td>
+      <td>
+        <p>Begin a new chain, starting with this operator. The two
+       mappers will be chained, and filter will not be chained to
+       the first mapper.
+{% highlight scala %}
+someStream.filter(...).map(...).startNewChain().map(...)
+{% endhighlight %}
+        </p>
+      </td>
+    </tr>
+   <tr>
+      <td>Disable chaining</td>
+      <td>
+        <p>Do not chain the map operator
+{% highlight scala %}
+someStream.map(...).disableChaining()
+{% endhighlight %}
+        </p>
+      </td>
+    </tr>
+  <tr>
+      <td>Set slot sharing group</td>
+      <td>
+        <p>Set the slot sharing group of an operation. Flink will put 
operations with the same
+        slot sharing group into the same slot while keeping operations that 
don't have the
+        slot sharing group in other slots. This can be used to isolate slots. 
The slot sharing
+        group is inherited from input operations if all input operations are 
in the same slot
+        sharing group.
+        The name of the default slot sharing group is "default", operations 
can explicitly
+        be put into this group by calling slotSharingGroup("default").
+{% highlight java %}
+someStream.filter(...).slotSharingGroup("name")
+{% endhighlight %}
+        </p>
+      </td>
+    </tr>
+  </tbody>
+</table>
+
+</div>
+</div>
+
+
+{% top %}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/31b86f60/docs/dev/stream/operators/process_function.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/operators/process_function.md 
b/docs/dev/stream/operators/process_function.md
new file mode 100644
index 0000000..9f32359
--- /dev/null
+++ b/docs/dev/stream/operators/process_function.md
@@ -0,0 +1,238 @@
+---
+title: "Process Function (Low-level Operations)"
+nav-title: "Process Function"
+nav-parent_id: streaming_operators
+nav-pos: 35
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* This will be replaced by the TOC
+{:toc}
+
+## The ProcessFunction
+
+The `ProcessFunction` is a low-level stream processing operation, giving 
access to the basic building blocks of
+all (acyclic) streaming applications:
+
+  - events (stream elements)
+  - state (fault-tolerant, consistent, only on keyed stream)
+  - timers (event time and processing time, only on keyed stream)
+
+The `ProcessFunction` can be thought of as a `FlatMapFunction` with access to 
keyed state and timers. It handles events
+by being invoked for each event received in the input stream(s).
+
+For fault-tolerant state, the `ProcessFunction` gives access to Flink's [keyed 
state]({{ site.baseurl }}/dev/stream/state/state.html), accessible via the
+`RuntimeContext`, similar to the way other stateful functions can access keyed 
state.
+
+The timers allow applications to react to changes in processing time and in 
[event time]({{ site.baseurl }}/dev/event_time.html).
+Every call to the function `processElement(...)` gets a `Context` object which 
gives access to the element's
+event time timestamp, and to the *TimerService*. The `TimerService` can be 
used to register callbacks for future
+event-/processing-time instants. When a timer's particular time is reached, 
the `onTimer(...)` method is
+called. During that call, all states are again scoped to the key with which 
the timer was created, allowing
+timers to manipulate keyed state.
+
+<span class="label label-info">Note</span> If you want to access keyed state 
and timers you have
+to apply the `ProcessFunction` on a keyed stream:
+
+{% highlight java %}
+stream.keyBy(...).process(new MyProcessFunction())
+{% endhighlight %}
+
+
+## Low-level Joins
+
+To realize low-level operations on two inputs, applications can use 
`CoProcessFunction`. This
+function is bound to two different inputs and gets individual calls to 
`processElement1(...)` and
+`processElement2(...)` for records from the two different inputs.
+
+Implementing a low level join typically follows this pattern:
+
+  - Create a state object for one input (or both)
+  - Update the state upon receiving elements from its input
+  - Upon receiving elements from the other input, probe the state and produce 
the joined result
+
+For example, you might be joining customer data to financial trades,
+while keeping state for the customer data. If you care about having
+complete and deterministic joins in the face of out-of-order events,
+you can use a timer to evaluate and emit the join for a trade when the
+watermark for the customer data stream has passed the time of that
+trade.
+
+## Example
+
+The following example maintains counts per key, and emits a key/count pair 
whenever a minute passes (in event time) without an update for that key:
+
+  - The count, key, and last-modification-timestamp are stored in a 
`ValueState`, which is implicitly scoped by key.
+  - For each record, the `ProcessFunction` increments the counter and sets the 
last-modification timestamp
+  - The function also schedules a callback one minute into the future (in 
event time)
+  - Upon each callback, it checks the callback's event time timestamp against 
the last-modification time of the stored count
+    and emits the key/count if they match (i.e., no further update occurred 
during that minute)
+
+<span class="label label-info">Note</span> This simple example could have been 
implemented with
+session windows. We use `ProcessFunction` here to illustrate the basic pattern 
it provides.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+{% highlight java %}
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.streaming.api.functions.ProcessFunction.Context;
+import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext;
+import org.apache.flink.util.Collector;
+
+
+// the source data stream
+DataStream<Tuple2<String, String>> stream = ...;
+
+// apply the process function onto a keyed stream
+DataStream<Tuple2<String, Long>> result = stream
+    .keyBy(0)
+    .process(new CountWithTimeoutFunction());
+
+/**
+ * The data type stored in the state
+ */
+public class CountWithTimestamp {
+
+    public String key;
+    public long count;
+    public long lastModified;
+}
+
+/**
+ * The implementation of the ProcessFunction that maintains the count and 
timeouts
+ */
+public class CountWithTimeoutFunction extends ProcessFunction<Tuple2<String, 
String>, Tuple2<String, Long>> {
+
+    /** The state that is maintained by this process function */
+    private ValueState<CountWithTimestamp> state;
+
+    @Override
+    public void open(Configuration parameters) throws Exception {
+        state = getRuntimeContext().getState(new 
ValueStateDescriptor<>("myState", CountWithTimestamp.class));
+    }
+
+    @Override
+    public void processElement(Tuple2<String, String> value, Context ctx, 
Collector<Tuple2<String, Long>> out)
+            throws Exception {
+
+        // retrieve the current count
+        CountWithTimestamp current = state.value();
+        if (current == null) {
+            current = new CountWithTimestamp();
+            current.key = value.f0;
+        }
+
+        // update the state's count
+        current.count++;
+
+        // set the state's timestamp to the record's assigned event time 
timestamp
+        current.lastModified = ctx.timestamp();
+
+        // write the state back
+        state.update(current);
+
+        // schedule the next timer 60 seconds from the current event time
+        ctx.timerService().registerEventTimeTimer(current.lastModified + 
60000);
+    }
+
+    @Override
+    public void onTimer(long timestamp, OnTimerContext ctx, 
Collector<Tuple2<String, Long>> out)
+            throws Exception {
+
+        // get the state for the key that scheduled the timer
+        CountWithTimestamp result = state.value();
+
+        // check if this is an outdated timer or the latest timer
+        if (timestamp == result.lastModified + 60000) {
+            // emit the state on timeout
+            out.collect(new Tuple2<String, Long>(result.key, result.count));
+        }
+    }
+}
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.streaming.api.functions.ProcessFunction.Context
+import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext
+import org.apache.flink.util.Collector
+
+// the source data stream
+val stream: DataStream[Tuple2[String, String]] = ...
+
+// apply the process function onto a keyed stream
+val result: DataStream[Tuple2[String, Long]] = stream
+  .keyBy(0)
+  .process(new CountWithTimeoutFunction())
+
+/**
+  * The data type stored in the state
+  */
+case class CountWithTimestamp(key: String, count: Long, lastModified: Long)
+
+/**
+  * The implementation of the ProcessFunction that maintains the count and 
timeouts
+  */
+class CountWithTimeoutFunction extends ProcessFunction[(String, String), 
(String, Long)] {
+
+  /** The state that is maintained by this process function */
+  lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext
+    .getState(new ValueStateDescriptor[CountWithTimestamp]("myState", 
classOf[CountWithTimestamp]))
+
+
+  override def processElement(value: (String, String), ctx: Context, out: 
Collector[(String, Long)]): Unit = {
+    // initialize or retrieve/update the state
+
+    val current: CountWithTimestamp = state.value match {
+      case null =>
+        CountWithTimestamp(value._1, 1, ctx.timestamp)
+      case CountWithTimestamp(key, count, lastModified) =>
+        CountWithTimestamp(key, count + 1, ctx.timestamp)
+    }
+
+    // write the state back
+    state.update(current)
+
+    // schedule the next timer 60 seconds from the current event time
+    ctx.timerService.registerEventTimeTimer(current.lastModified + 60000)
+  }
+
+  override def onTimer(timestamp: Long, ctx: OnTimerContext, out: 
Collector[(String, Long)]): Unit = {
+    state.value match {
+      case CountWithTimestamp(key, count, lastModified) if (timestamp == 
lastModified + 60000) =>
+        out.collect((key, count))
+      case _ =>
+    }
+  }
+}
+{% endhighlight %}
+</div>
+</div>
+
+{% top %}

Reply via email to