[FLINK-7370][docs] rework the operator documentation structure

- create category `Streaming/Operators`
- move `Streaming/Overview/DataStream Transformations` to 
`Streaming/Operators/Overview`
- move `ProcessFunction`, `Windows`, and `Async IO` to `Streaming/Operators`
- update previous links in the documentation
- create any necessary redirects for old URLs


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cafa45e2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cafa45e2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cafa45e2

Branch: refs/heads/master
Commit: cafa45e2ff97dddf807859ae8e22e694f2630783
Parents: ff70cc3
Author: Nico Kruber <n...@data-artisans.com>
Authored: Fri Aug 4 10:56:58 2017 +0200
Committer: twalthr <twal...@apache.org>
Committed: Wed Aug 9 13:56:43 2017 +0200

----------------------------------------------------------------------
 docs/concepts/programming-model.md  |    2 +-
 docs/dev/datastream_api.md          | 1141 +----------------------------
 docs/dev/stream/asyncio.md          |    2 +-
 docs/dev/stream/operators.md        | 1169 ++++++++++++++++++++++++++++++
 docs/dev/stream/process_function.md |    2 +-
 docs/dev/stream/windows.md          | 1039 ++++++++++++++++++++++++++
 docs/dev/windows.md                 | 1039 --------------------------
 docs/redirects/windows.md           |    2 +-
 docs/redirects/windows_2.md         |   24 +
 9 files changed, 2239 insertions(+), 2181 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/cafa45e2/docs/concepts/programming-model.md
----------------------------------------------------------------------
diff --git a/docs/concepts/programming-model.md 
b/docs/concepts/programming-model.md
index fd5ebee..926fdd7 100644
--- a/docs/concepts/programming-model.md
+++ b/docs/concepts/programming-model.md
@@ -82,7 +82,7 @@ Often there is a one-to-one correspondence between the 
transformations in the pr
 in the dataflow. Sometimes, however, one transformation may consist of 
multiple transformation operators.
 
 Sources and sinks are documented in the [streaming 
connectors](../dev/connectors/index.html) and [batch 
connectors](../dev/batch/connectors.html) docs.
-Transformations are documented in [DataStream 
transformations](../dev/datastream_api.html#datastream-transformations) and 
[DataSet transformations](../dev/batch/dataset_transformations.html).
+Transformations are documented in [DataStream operators]({{ site.baseurl 
}}/dev/stream/operators.html) and [DataSet 
transformations](../dev/batch/dataset_transformations.html).
 
 {% top %}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/cafa45e2/docs/dev/datastream_api.md
----------------------------------------------------------------------
diff --git a/docs/dev/datastream_api.md b/docs/dev/datastream_api.md
index 8b3899b..b7f02ef 100644
--- a/docs/dev/datastream_api.md
+++ b/docs/dev/datastream_api.md
@@ -38,7 +38,7 @@ to the basic concepts of the Flink API.
 In order to create your own Flink DataStream program, we encourage you to 
start with
 [anatomy of a Flink Program]({{ site.baseurl 
}}/dev/api_concepts.html#anatomy-of-a-flink-program)
 and gradually add your own
-[transformations](#datastream-transformations). The remaining sections act as 
references for additional
+[stream transformations]({{ site.baseurl }}/dev/stream/operators.html). The 
remaining sections act as references for additional
 operations and advanced features.
 
 
@@ -138,1143 +138,8 @@ word count program. If you want to see counts greater 
than 1, type the same word
 DataStream Transformations
 --------------------------
 
-Data transformations transform one or more DataStreams into a new DataStream. 
Programs can combine
-multiple transformations into sophisticated topologies.
-
-This section gives a description of all the available transformations.
-
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-
-<br />
-
-<table class="table table-bordered">
-  <thead>
-    <tr>
-      <th class="text-left" style="width: 25%">Transformation</th>
-      <th class="text-center">Description</th>
-    </tr>
-  </thead>
-  <tbody>
-    <tr>
-          <td><strong>Map</strong><br>DataStream &rarr; DataStream</td>
-          <td>
-            <p>Takes one element and produces one element. A map function that 
doubles the values of the input stream:</p>
-    {% highlight java %}
-DataStream<Integer> dataStream = //...
-dataStream.map(new MapFunction<Integer, Integer>() {
-    @Override
-    public Integer map(Integer value) throws Exception {
-        return 2 * value;
-    }
-});
-    {% endhighlight %}
-          </td>
-        </tr>
-
-        <tr>
-          <td><strong>FlatMap</strong><br>DataStream &rarr; DataStream</td>
-          <td>
-            <p>Takes one element and produces zero, one, or more elements. A 
flatmap function that splits sentences to words:</p>
-    {% highlight java %}
-dataStream.flatMap(new FlatMapFunction<String, String>() {
-    @Override
-    public void flatMap(String value, Collector<String> out)
-        throws Exception {
-        for(String word: value.split(" ")){
-            out.collect(word);
-        }
-    }
-});
-    {% endhighlight %}
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Filter</strong><br>DataStream &rarr; DataStream</td>
-          <td>
-            <p>Evaluates a boolean function for each element and retains those 
for which the function returns true.
-            A filter that filters out zero values:
-            </p>
-    {% highlight java %}
-dataStream.filter(new FilterFunction<Integer>() {
-    @Override
-    public boolean filter(Integer value) throws Exception {
-        return value != 0;
-    }
-});
-    {% endhighlight %}
-          </td>
-        </tr>
-        <tr>
-          <td><strong>KeyBy</strong><br>DataStream &rarr; KeyedStream</td>
-          <td>
-            <p>Logically partitions a stream into disjoint partitions, each 
partition containing elements of the same key.
-            Internally, this is implemented with hash partitioning. See <a 
href="{{ site.baseurl }}/dev/api_concepts.html#specifying-keys">keys</a> on how 
to specify keys.
-            This transformation returns a KeyedStream.</p>
-    {% highlight java %}
-dataStream.keyBy("someKey") // Key by field "someKey"
-dataStream.keyBy(0) // Key by the first element of a Tuple
-    {% endhighlight %}
-            <p>
-            <span class="label label-danger">Attention</span> 
-            A type <strong>cannot be a key</strong> if:
-           <ol> 
-           <li> it is a POJO type but does not override the 
<em>hashCode()</em> method and 
-           relies on the <em>Object.hashCode()</em> implementation.</li>
-           <li> it is an array of any type.</li>
-           </ol>
-           </p>
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Reduce</strong><br>KeyedStream &rarr; DataStream</td>
-          <td>
-            <p>A "rolling" reduce on a keyed data stream. Combines the current 
element with the last reduced value and
-            emits the new value.
-                    <br/>
-               <br/>
-            A reduce function that creates a stream of partial sums:</p>
-            {% highlight java %}
-keyedStream.reduce(new ReduceFunction<Integer>() {
-    @Override
-    public Integer reduce(Integer value1, Integer value2)
-    throws Exception {
-        return value1 + value2;
-    }
-});
-            {% endhighlight %}
-            </p>
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Fold</strong><br>KeyedStream &rarr; DataStream</td>
-          <td>
-          <p>A "rolling" fold on a keyed data stream with an initial value.
-          Combines the current element with the last folded value and
-          emits the new value.
-          <br/>
-          <br/>
-          <p>A fold function that, when applied on the sequence (1,2,3,4,5),
-          emits the sequence "start-1", "start-1-2", "start-1-2-3", ...</p>
-          {% highlight java %}
-DataStream<String> result =
-  keyedStream.fold("start", new FoldFunction<Integer, String>() {
-    @Override
-    public String fold(String current, Integer value) {
-        return current + "-" + value;
-    }
-  });
-          {% endhighlight %}
-          </p>
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Aggregations</strong><br>KeyedStream &rarr; 
DataStream</td>
-          <td>
-            <p>Rolling aggregations on a keyed data stream. The difference 
between min
-           and minBy is that min returns the minimum value, whereas minBy 
returns
-           the element that has the minimum value in this field (same for max 
and maxBy).</p>
-    {% highlight java %}
-keyedStream.sum(0);
-keyedStream.sum("key");
-keyedStream.min(0);
-keyedStream.min("key");
-keyedStream.max(0);
-keyedStream.max("key");
-keyedStream.minBy(0);
-keyedStream.minBy("key");
-keyedStream.maxBy(0);
-keyedStream.maxBy("key");
-    {% endhighlight %}
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Window</strong><br>KeyedStream &rarr; WindowedStream</td>
-          <td>
-            <p>Windows can be defined on already partitioned KeyedStreams. 
Windows group the data in each
-            key according to some characteristic (e.g., the data that arrived 
within the last 5 seconds).
-            See <a href="windows.html">windows</a> for a complete description 
of windows.
-    {% highlight java %}
-dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))); // 
Last 5 seconds of data
-    {% endhighlight %}
-        </p>
-          </td>
-        </tr>
-        <tr>
-          <td><strong>WindowAll</strong><br>DataStream &rarr; 
AllWindowedStream</td>
-          <td>
-              <p>Windows can be defined on regular DataStreams. Windows group 
all the stream events
-              according to some characteristic (e.g., the data that arrived 
within the last 5 seconds).
-              See <a href="windows.html">windows</a> for a complete 
description of windows.</p>
-              <p><strong>WARNING:</strong> This is in many cases a 
<strong>non-parallel</strong> transformation. All records will be
-               gathered in one task for the windowAll operator.</p>
-  {% highlight java %}
-dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 
seconds of data
-  {% endhighlight %}
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Window Apply</strong><br>WindowedStream &rarr; 
DataStream<br>AllWindowedStream &rarr; DataStream</td>
-          <td>
-            <p>Applies a general function to the window as a whole. Below is a 
function that manually sums the elements of a window.</p>
-            <p><strong>Note:</strong> If you are using a windowAll 
transformation, you need to use an AllWindowFunction instead.</p>
-    {% highlight java %}
-windowedStream.apply (new WindowFunction<Tuple2<String,Integer>, Integer, 
Tuple, Window>() {
-    public void apply (Tuple tuple,
-            Window window,
-            Iterable<Tuple2<String, Integer>> values,
-            Collector<Integer> out) throws Exception {
-        int sum = 0;
-        for (value t: values) {
-            sum += t.f1;
-        }
-        out.collect (new Integer(sum));
-    }
-});
-
-// applying an AllWindowFunction on non-keyed window stream
-allWindowedStream.apply (new AllWindowFunction<Tuple2<String,Integer>, 
Integer, Window>() {
-    public void apply (Window window,
-            Iterable<Tuple2<String, Integer>> values,
-            Collector<Integer> out) throws Exception {
-        int sum = 0;
-        for (value t: values) {
-            sum += t.f1;
-        }
-        out.collect (new Integer(sum));
-    }
-});
-    {% endhighlight %}
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Window Reduce</strong><br>WindowedStream &rarr; 
DataStream</td>
-          <td>
-            <p>Applies a functional reduce function to the window and returns 
the reduced value.</p>
-    {% highlight java %}
-windowedStream.reduce (new ReduceFunction<Tuple2<String,Integer>>() {
-    public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, 
Tuple2<String, Integer> value2) throws Exception {
-        return new Tuple2<String,Integer>(value1.f0, value1.f1 + value2.f1);
-    }
-});
-    {% endhighlight %}
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Window Fold</strong><br>WindowedStream &rarr; 
DataStream</td>
-          <td>
-            <p>Applies a functional fold function to the window and returns 
the folded value.
-               The example function, when applied on the sequence (1,2,3,4,5),
-               folds the sequence into the string "start-1-2-3-4-5":</p>
-    {% highlight java %}
-windowedStream.fold("start", new FoldFunction<Integer, String>() {
-    public String fold(String current, Integer value) {
-        return current + "-" + value;
-    }
-});
-    {% endhighlight %}
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Aggregations on windows</strong><br>WindowedStream 
&rarr; DataStream</td>
-          <td>
-            <p>Aggregates the contents of a window. The difference between min
-           and minBy is that min returns the minimun value, whereas minBy 
returns
-           the element that has the minimum value in this field (same for max 
and maxBy).</p>
-    {% highlight java %}
-windowedStream.sum(0);
-windowedStream.sum("key");
-windowedStream.min(0);
-windowedStream.min("key");
-windowedStream.max(0);
-windowedStream.max("key");
-windowedStream.minBy(0);
-windowedStream.minBy("key");
-windowedStream.maxBy(0);
-windowedStream.maxBy("key");
-    {% endhighlight %}
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Union</strong><br>DataStream* &rarr; DataStream</td>
-          <td>
-            <p>Union of two or more data streams creating a new stream 
containing all the elements from all the streams. Note: If you union a data 
stream
-            with itself you will get each element twice in the resulting 
stream.</p>
-    {% highlight java %}
-dataStream.union(otherStream1, otherStream2, ...);
-    {% endhighlight %}
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Window Join</strong><br>DataStream,DataStream &rarr; 
DataStream</td>
-          <td>
-            <p>Join two data streams on a given key and a common window.</p>
-    {% highlight java %}
-dataStream.join(otherStream)
-    .where(<key selector>).equalTo(<key selector>)
-    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
-    .apply (new JoinFunction () {...});
-    {% endhighlight %}
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Window CoGroup</strong><br>DataStream,DataStream &rarr; 
DataStream</td>
-          <td>
-            <p>Cogroups two data streams on a given key and a common 
window.</p>
-    {% highlight java %}
-dataStream.coGroup(otherStream)
-    .where(0).equalTo(1)
-    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
-    .apply (new CoGroupFunction () {...});
-    {% endhighlight %}
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Connect</strong><br>DataStream,DataStream &rarr; 
ConnectedStreams</td>
-          <td>
-            <p>"Connects" two data streams retaining their types. Connect 
allowing for shared state between
-            the two streams.</p>
-    {% highlight java %}
-DataStream<Integer> someStream = //...
-DataStream<String> otherStream = //...
-
-ConnectedStreams<Integer, String> connectedStreams = 
someStream.connect(otherStream);
-    {% endhighlight %}
-          </td>
-        </tr>
-        <tr>
-          <td><strong>CoMap, CoFlatMap</strong><br>ConnectedStreams &rarr; 
DataStream</td>
-          <td>
-            <p>Similar to map and flatMap on a connected data stream</p>
-    {% highlight java %}
-connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() {
-    @Override
-    public Boolean map1(Integer value) {
-        return true;
-    }
-
-    @Override
-    public Boolean map2(String value) {
-        return false;
-    }
-});
-connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() {
-
-   @Override
-   public void flatMap1(Integer value, Collector<String> out) {
-       out.collect(value.toString());
-   }
-
-   @Override
-   public void flatMap2(String value, Collector<String> out) {
-       for (String word: value.split(" ")) {
-         out.collect(word);
-       }
-   }
-});
-    {% endhighlight %}
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Split</strong><br>DataStream &rarr; SplitStream</td>
-          <td>
-            <p>
-                Split the stream into two or more streams according to some 
criterion.
-                {% highlight java %}
-SplitStream<Integer> split = someDataStream.split(new 
OutputSelector<Integer>() {
-    @Override
-    public Iterable<String> select(Integer value) {
-        List<String> output = new ArrayList<String>();
-        if (value % 2 == 0) {
-            output.add("even");
-        }
-        else {
-            output.add("odd");
-        }
-        return output;
-    }
-});
-                {% endhighlight %}
-            </p>
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Select</strong><br>SplitStream &rarr; DataStream</td>
-          <td>
-            <p>
-                Select one or more streams from a split stream.
-                {% highlight java %}
-SplitStream<Integer> split;
-DataStream<Integer> even = split.select("even");
-DataStream<Integer> odd = split.select("odd");
-DataStream<Integer> all = split.select("even","odd");
-                {% endhighlight %}
-            </p>
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Iterate</strong><br>DataStream &rarr; IterativeStream 
&rarr; DataStream</td>
-          <td>
-            <p>
-                Creates a "feedback" loop in the flow, by redirecting the 
output of one operator
-                to some previous operator. This is especially useful for 
defining algorithms that
-                continuously update a model. The following code starts with a 
stream and applies
-               the iteration body continuously. Elements that are greater than 
0 are sent back
-               to the feedback channel, and the rest of the elements are 
forwarded downstream.
-               See <a href="#iterations">iterations</a> for a complete 
description.
-                {% highlight java %}
-IterativeStream<Long> iteration = initialStream.iterate();
-DataStream<Long> iterationBody = iteration.map (/*do something*/);
-DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>(){
-    @Override
-    public boolean filter(Integer value) throws Exception {
-        return value > 0;
-    }
-});
-iteration.closeWith(feedback);
-DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>(){
-    @Override
-    public boolean filter(Integer value) throws Exception {
-        return value <= 0;
-    }
-});
-                {% endhighlight %}
-            </p>
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Extract Timestamps</strong><br>DataStream &rarr; 
DataStream</td>
-          <td>
-            <p>
-                Extracts timestamps from records in order to work with windows
-                that use event time semantics. See <a href="{{ site.baseurl 
}}/dev/event_time.html">Event Time</a>.
-                {% highlight java %}
-stream.assignTimestamps (new TimeStampExtractor() {...});
-                {% endhighlight %}
-            </p>
-          </td>
-        </tr>
-  </tbody>
-</table>
-
-</div>
-
-<div data-lang="scala" markdown="1">
-
-<br />
-
-<table class="table table-bordered">
-  <thead>
-    <tr>
-      <th class="text-left" style="width: 25%">Transformation</th>
-      <th class="text-center">Description</th>
-    </tr>
-  </thead>
-  <tbody>
-    <tr>
-          <td><strong>Map</strong><br>DataStream &rarr; DataStream</td>
-          <td>
-            <p>Takes one element and produces one element. A map function that 
doubles the values of the input stream:</p>
-    {% highlight scala %}
-dataStream.map { x => x * 2 }
-    {% endhighlight %}
-          </td>
-        </tr>
-
-        <tr>
-          <td><strong>FlatMap</strong><br>DataStream &rarr; DataStream</td>
-          <td>
-            <p>Takes one element and produces zero, one, or more elements. A 
flatmap function that splits sentences to words:</p>
-    {% highlight scala %}
-dataStream.flatMap { str => str.split(" ") }
-    {% endhighlight %}
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Filter</strong><br>DataStream &rarr; DataStream</td>
-          <td>
-            <p>Evaluates a boolean function for each element and retains those 
for which the function returns true.
-            A filter that filters out zero values:
-            </p>
-    {% highlight scala %}
-dataStream.filter { _ != 0 }
-    {% endhighlight %}
-          </td>
-        </tr>
-        <tr>
-          <td><strong>KeyBy</strong><br>DataStream &rarr; KeyedStream</td>
-          <td>
-            <p>Logically partitions a stream into disjoint partitions, each 
partition containing elements of the same key.
-            Internally, this is implemented with hash partitioning. See <a 
href="{{ site.baseurl }}/dev/api_concepts.html#specifying-keys">keys</a> on how 
to specify keys.
-            This transformation returns a KeyedStream.</p>
-    {% highlight scala %}
-dataStream.keyBy("someKey") // Key by field "someKey"
-dataStream.keyBy(0) // Key by the first element of a Tuple
-    {% endhighlight %}
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Reduce</strong><br>KeyedStream &rarr; DataStream</td>
-          <td>
-            <p>A "rolling" reduce on a keyed data stream. Combines the current 
element with the last reduced value and
-            emits the new value.
-                    <br/>
-               <br/>
-            A reduce function that creates a stream of partial sums:</p>
-            {% highlight scala %}
-keyedStream.reduce { _ + _ }
-            {% endhighlight %}
-            </p>
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Fold</strong><br>KeyedStream &rarr; DataStream</td>
-          <td>
-          <p>A "rolling" fold on a keyed data stream with an initial value.
-          Combines the current element with the last folded value and
-          emits the new value.
-          <br/>
-          <br/>
-          <p>A fold function that, when applied on the sequence (1,2,3,4,5),
-          emits the sequence "start-1", "start-1-2", "start-1-2-3", ...</p>
-          {% highlight scala %}
-val result: DataStream[String] =
-    keyedStream.fold("start")((str, i) => { str + "-" + i })
-          {% endhighlight %}
-          </p>
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Aggregations</strong><br>KeyedStream &rarr; 
DataStream</td>
-          <td>
-            <p>Rolling aggregations on a keyed data stream. The difference 
between min
-           and minBy is that min returns the minimun value, whereas minBy 
returns
-           the element that has the minimum value in this field (same for max 
and maxBy).</p>
-    {% highlight scala %}
-keyedStream.sum(0)
-keyedStream.sum("key")
-keyedStream.min(0)
-keyedStream.min("key")
-keyedStream.max(0)
-keyedStream.max("key")
-keyedStream.minBy(0)
-keyedStream.minBy("key")
-keyedStream.maxBy(0)
-keyedStream.maxBy("key")
-    {% endhighlight %}
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Window</strong><br>KeyedStream &rarr; WindowedStream</td>
-          <td>
-            <p>Windows can be defined on already partitioned KeyedStreams. 
Windows group the data in each
-            key according to some characteristic (e.g., the data that arrived 
within the last 5 seconds).
-            See <a href="windows.html">windows</a> for a description of 
windows.
-    {% highlight scala %}
-dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))) // 
Last 5 seconds of data
-    {% endhighlight %}
-        </p>
-          </td>
-        </tr>
-        <tr>
-          <td><strong>WindowAll</strong><br>DataStream &rarr; 
AllWindowedStream</td>
-          <td>
-              <p>Windows can be defined on regular DataStreams. Windows group 
all the stream events
-              according to some characteristic (e.g., the data that arrived 
within the last 5 seconds).
-              See <a href="windows.html">windows</a> for a complete 
description of windows.</p>
-              <p><strong>WARNING:</strong> This is in many cases a 
<strong>non-parallel</strong> transformation. All records will be
-               gathered in one task for the windowAll operator.</p>
-  {% highlight scala %}
-dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))) // Last 5 
seconds of data
-  {% endhighlight %}
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Window Apply</strong><br>WindowedStream &rarr; 
DataStream<br>AllWindowedStream &rarr; DataStream</td>
-          <td>
-            <p>Applies a general function to the window as a whole. Below is a 
function that manually sums the elements of a window.</p>
-            <p><strong>Note:</strong> If you are using a windowAll 
transformation, you need to use an AllWindowFunction instead.</p>
-    {% highlight scala %}
-windowedStream.apply { WindowFunction }
-
-// applying an AllWindowFunction on non-keyed window stream
-allWindowedStream.apply { AllWindowFunction }
-
-    {% endhighlight %}
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Window Reduce</strong><br>WindowedStream &rarr; 
DataStream</td>
-          <td>
-            <p>Applies a functional reduce function to the window and returns 
the reduced value.</p>
-    {% highlight scala %}
-windowedStream.reduce { _ + _ }
-    {% endhighlight %}
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Window Fold</strong><br>WindowedStream &rarr; 
DataStream</td>
-          <td>
-            <p>Applies a functional fold function to the window and returns 
the folded value.
-               The example function, when applied on the sequence (1,2,3,4,5),
-               folds the sequence into the string "start-1-2-3-4-5":</p>
-          {% highlight scala %}
-val result: DataStream[String] =
-    windowedStream.fold("start", (str, i) => { str + "-" + i })
-          {% endhighlight %}
-          </td>
-       </tr>
-        <tr>
-          <td><strong>Aggregations on windows</strong><br>WindowedStream 
&rarr; DataStream</td>
-          <td>
-            <p>Aggregates the contents of a window. The difference between min
-           and minBy is that min returns the minimum value, whereas minBy 
returns
-           the element that has the minimum value in this field (same for max 
and maxBy).</p>
-    {% highlight scala %}
-windowedStream.sum(0)
-windowedStream.sum("key")
-windowedStream.min(0)
-windowedStream.min("key")
-windowedStream.max(0)
-windowedStream.max("key")
-windowedStream.minBy(0)
-windowedStream.minBy("key")
-windowedStream.maxBy(0)
-windowedStream.maxBy("key")
-    {% endhighlight %}
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Union</strong><br>DataStream* &rarr; DataStream</td>
-          <td>
-            <p>Union of two or more data streams creating a new stream 
containing all the elements from all the streams. Note: If you union a data 
stream
-            with itself you will get each element twice in the resulting 
stream.</p>
-    {% highlight scala %}
-dataStream.union(otherStream1, otherStream2, ...)
-    {% endhighlight %}
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Window Join</strong><br>DataStream,DataStream &rarr; 
DataStream</td>
-          <td>
-            <p>Join two data streams on a given key and a common window.</p>
-    {% highlight scala %}
-dataStream.join(otherStream)
-    .where(<key selector>).equalTo(<key selector>)
-    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
-    .apply { ... }
-    {% endhighlight %}
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Window CoGroup</strong><br>DataStream,DataStream &rarr; 
DataStream</td>
-          <td>
-            <p>Cogroups two data streams on a given key and a common 
window.</p>
-    {% highlight scala %}
-dataStream.coGroup(otherStream)
-    .where(0).equalTo(1)
-    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
-    .apply {}
-    {% endhighlight %}
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Connect</strong><br>DataStream,DataStream &rarr; 
ConnectedStreams</td>
-          <td>
-            <p>"Connects" two data streams retaining their types, allowing for 
shared state between
-            the two streams.</p>
-    {% highlight scala %}
-someStream : DataStream[Int] = ...
-otherStream : DataStream[String] = ...
-
-val connectedStreams = someStream.connect(otherStream)
-    {% endhighlight %}
-          </td>
-        </tr>
-        <tr>
-          <td><strong>CoMap, CoFlatMap</strong><br>ConnectedStreams &rarr; 
DataStream</td>
-          <td>
-            <p>Similar to map and flatMap on a connected data stream</p>
-    {% highlight scala %}
-connectedStreams.map(
-    (_ : Int) => true,
-    (_ : String) => false
-)
-connectedStreams.flatMap(
-    (_ : Int) => true,
-    (_ : String) => false
-)
-    {% endhighlight %}
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Split</strong><br>DataStream &rarr; SplitStream</td>
-          <td>
-            <p>
-                Split the stream into two or more streams according to some 
criterion.
-                {% highlight scala %}
-val split = someDataStream.split(
-  (num: Int) =>
-    (num % 2) match {
-      case 0 => List("even")
-      case 1 => List("odd")
-    }
-)
-                {% endhighlight %}
-            </p>
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Select</strong><br>SplitStream &rarr; DataStream</td>
-          <td>
-            <p>
-                Select one or more streams from a split stream.
-                {% highlight scala %}
-
-val even = split select "even"
-val odd = split select "odd"
-val all = split.select("even","odd")
-                {% endhighlight %}
-            </p>
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Iterate</strong><br>DataStream &rarr; IterativeStream  
&rarr; DataStream</td>
-          <td>
-            <p>
-                Creates a "feedback" loop in the flow, by redirecting the 
output of one operator
-                to some previous operator. This is especially useful for 
defining algorithms that
-                continuously update a model. The following code starts with a 
stream and applies
-               the iteration body continuously. Elements that are greater than 
0 are sent back
-               to the feedback channel, and the rest of the elements are 
forwarded downstream.
-               See <a href="#iterations">iterations</a> for a complete 
description.
-                {% highlight java %}
-initialStream.iterate {
-  iteration => {
-    val iterationBody = iteration.map {/*do something*/}
-    (iterationBody.filter(_ > 0), iterationBody.filter(_ <= 0))
-  }
-}
-                {% endhighlight %}
-            </p>
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Extract Timestamps</strong><br>DataStream &rarr; 
DataStream</td>
-          <td>
-            <p>
-                Extracts timestamps from records in order to work with windows
-                that use event time semantics.
-                See <a href="{{ site.baseurl 
}}/apis/streaming/event_time.html">Event Time</a>.
-                {% highlight scala %}
-stream.assignTimestamps { timestampExtractor }
-                {% endhighlight %}
-            </p>
-          </td>
-        </tr>
-  </tbody>
-</table>
-
-Extraction from tuples, case classes and collections via anonymous pattern 
matching, like the following:
-{% highlight scala %}
-val data: DataStream[(Int, String, Double)] = // [...]
-data.map {
-  case (id, name, temperature) => // [...]
-}
-{% endhighlight %}
-is not supported by the API out-of-the-box. To use this feature, you should 
use a <a href="scala_api_extensions.html">Scala API extension</a>.
-
-
-</div>
-</div>
-
-The following transformations are available on data streams of Tuples:
-
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-
-<br />
-
-<table class="table table-bordered">
-  <thead>
-    <tr>
-      <th class="text-left" style="width: 20%">Transformation</th>
-      <th class="text-center">Description</th>
-    </tr>
-  </thead>
-  <tbody>
-   <tr>
-      <td><strong>Project</strong><br>DataStream &rarr; DataStream</td>
-      <td>
-        <p>Selects a subset of fields from the tuples
-{% highlight java %}
-DataStream<Tuple3<Integer, Double, String>> in = // [...]
-DataStream<Tuple2<String, Integer>> out = in.project(2,0);
-{% endhighlight %}
-        </p>
-      </td>
-    </tr>
-  </tbody>
-</table>
-
-</div>
-</div>
-
-
-### Physical partitioning
-
-Flink also gives low-level control (if desired) on the exact stream 
partitioning after a transformation,
-via the following functions.
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-
-<br />
-
-<table class="table table-bordered">
-  <thead>
-    <tr>
-      <th class="text-left" style="width: 20%">Transformation</th>
-      <th class="text-center">Description</th>
-    </tr>
-  </thead>
-  <tbody>
-   <tr>
-      <td><strong>Custom partitioning</strong><br>DataStream &rarr; 
DataStream</td>
-      <td>
-        <p>
-            Uses a user-defined Partitioner to select the target task for each 
element.
-            {% highlight java %}
-dataStream.partitionCustom(partitioner, "someKey");
-dataStream.partitionCustom(partitioner, 0);
-            {% endhighlight %}
-        </p>
-      </td>
-    </tr>
-   <tr>
-     <td><strong>Random partitioning</strong><br>DataStream &rarr; 
DataStream</td>
-     <td>
-       <p>
-            Partitions elements randomly according to a uniform distribution.
-            {% highlight java %}
-dataStream.shuffle();
-            {% endhighlight %}
-       </p>
-     </td>
-   </tr>
-   <tr>
-      <td><strong>Rebalancing (Round-robin 
partitioning)</strong><br>DataStream &rarr; DataStream</td>
-      <td>
-        <p>
-            Partitions elements round-robin, creating equal load per 
partition. Useful for performance
-            optimization in the presence of data skew.
-            {% highlight java %}
-dataStream.rebalance();
-            {% endhighlight %}
-        </p>
-      </td>
-    </tr>
-    <tr>
-      <td><strong>Rescaling</strong><br>DataStream &rarr; DataStream</td>
-      <td>
-        <p>
-            Partitions elements, round-robin, to a subset of downstream 
operations. This is
-            useful if you want to have pipelines where you, for example, fan 
out from
-            each parallel instance of a source to a subset of several mappers 
to distribute load
-            but don't want the full rebalance that rebalance() would incur. 
This would require only
-            local data transfers instead of transferring data over network, 
depending on
-            other configuration values such as the number of slots of 
TaskManagers.
-        </p>
-        <p>
-            The subset of downstream operations to which the upstream 
operation sends
-            elements depends on the degree of parallelism of both the upstream 
and downstream operation.
-            For example, if the upstream operation has parallelism 2 and the 
downstream operation
-            has parallelism 6, then one upstream operation would distribute 
elements to three
-            downstream operations while the other upstream operation would 
distribute to the other
-            three downstream operations. If, on the other hand, the downstream 
operation has parallelism
-            2 while the upstream operation has parallelism 6 then three 
upstream operations would
-            distribute to one downstream operation while the other three 
upstream operations would
-            distribute to the other downstream operation.
-        </p>
-        <p>
-            In cases where the different parallelisms are not multiples of 
each other one or several
-            downstream operations will have a differing number of inputs from 
upstream operations.
-        </p>
-        <p>
-            Please see this figure for a visualization of the connection 
pattern in the above
-            example:
-        </p>
-
-        <div style="text-align: center">
-            <img src="{{ site.baseurl }}/fig/rescale.svg" alt="Checkpoint 
barriers in data streams" />
-            </div>
-
-
-        <p>
-                    {% highlight java %}
-dataStream.rescale();
-            {% endhighlight %}
-
-        </p>
-      </td>
-    </tr>
-   <tr>
-      <td><strong>Broadcasting</strong><br>DataStream &rarr; DataStream</td>
-      <td>
-        <p>
-            Broadcasts elements to every partition.
-            {% highlight java %}
-dataStream.broadcast();
-            {% endhighlight %}
-        </p>
-      </td>
-    </tr>
-  </tbody>
-</table>
-
-</div>
-
-<div data-lang="scala" markdown="1">
-
-<br />
-
-<table class="table table-bordered">
-  <thead>
-    <tr>
-      <th class="text-left" style="width: 20%">Transformation</th>
-      <th class="text-center">Description</th>
-    </tr>
-  </thead>
-  <tbody>
-   <tr>
-      <td><strong>Custom partitioning</strong><br>DataStream &rarr; 
DataStream</td>
-      <td>
-        <p>
-            Uses a user-defined Partitioner to select the target task for each 
element.
-            {% highlight scala %}
-dataStream.partitionCustom(partitioner, "someKey")
-dataStream.partitionCustom(partitioner, 0)
-            {% endhighlight %}
-        </p>
-      </td>
-    </tr>
-   <tr>
-     <td><strong>Random partitioning</strong><br>DataStream &rarr; 
DataStream</td>
-     <td>
-       <p>
-            Partitions elements randomly according to a uniform distribution.
-            {% highlight scala %}
-dataStream.shuffle()
-            {% endhighlight %}
-       </p>
-     </td>
-   </tr>
-   <tr>
-      <td><strong>Rebalancing (Round-robin 
partitioning)</strong><br>DataStream &rarr; DataStream</td>
-      <td>
-        <p>
-            Partitions elements round-robin, creating equal load per 
partition. Useful for performance
-            optimization in the presence of data skew.
-            {% highlight scala %}
-dataStream.rebalance()
-            {% endhighlight %}
-        </p>
-      </td>
-    </tr>
-    <tr>
-      <td><strong>Rescaling</strong><br>DataStream &rarr; DataStream</td>
-      <td>
-        <p>
-            Partitions elements, round-robin, to a subset of downstream 
operations. This is
-            useful if you want to have pipelines where you, for example, fan 
out from
-            each parallel instance of a source to a subset of several mappers 
to distribute load
-            but don't want the full rebalance that rebalance() would incur. 
This would require only
-            local data transfers instead of transferring data over network, 
depending on
-            other configuration values such as the number of slots of 
TaskManagers.
-        </p>
-        <p>
-            The subset of downstream operations to which the upstream 
operation sends
-            elements depends on the degree of parallelism of both the upstream 
and downstream operation.
-            For example, if the upstream operation has parallelism 2 and the 
downstream operation
-            has parallelism 4, then one upstream operation would distribute 
elements to two
-            downstream operations while the other upstream operation would 
distribute to the other
-            two downstream operations. If, on the other hand, the downstream 
operation has parallelism
-            2 while the upstream operation has parallelism 4 then two upstream 
operations would
-            distribute to one downstream operation while the other two 
upstream operations would
-            distribute to the other downstream operations.
-        </p>
-        <p>
-            In cases where the different parallelisms are not multiples of 
each other one or several
-            downstream operations will have a differing number of inputs from 
upstream operations.
-
-        </p>
-        </p>
-            Please see this figure for a visualization of the connection 
pattern in the above
-            example:
-        </p>
-
-        <div style="text-align: center">
-            <img src="{{ site.baseurl }}/fig/rescale.svg" alt="Checkpoint 
barriers in data streams" />
-            </div>
-
-
-        <p>
-                    {% highlight java %}
-dataStream.rescale()
-            {% endhighlight %}
-
-        </p>
-      </td>
-    </tr>
-   <tr>
-      <td><strong>Broadcasting</strong><br>DataStream &rarr; DataStream</td>
-      <td>
-        <p>
-            Broadcasts elements to every partition.
-            {% highlight scala %}
-dataStream.broadcast()
-            {% endhighlight %}
-        </p>
-      </td>
-    </tr>
-  </tbody>
-</table>
-
-</div>
-</div>
-
-### Task chaining and resource groups
-
-Chaining two subsequent transformations means co-locating them within the same 
thread for better
-performance. Flink by default chains operators if this is possible (e.g., two 
subsequent map
-transformations). The API gives fine-grained control over chaining if desired:
-
-Use `StreamExecutionEnvironment.disableOperatorChaining()` if you want to 
disable chaining in
-the whole job. For more fine grained control, the following functions are 
available. Note that
-these functions can only be used right after a DataStream transformation as 
they refer to the
-previous transformation. For example, you can use 
`someStream.map(...).startNewChain()`, but
-you cannot use `someStream.startNewChain()`.
-
-A resource group is a slot in Flink, see
-[slots]({{site.baseurl}}/ops/config.html#configuring-taskmanager-processing-slots).
 You can
-manually isolate operators in separate slots if desired.
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-
-<br />
-
-<table class="table table-bordered">
-  <thead>
-    <tr>
-      <th class="text-left" style="width: 20%">Transformation</th>
-      <th class="text-center">Description</th>
-    </tr>
-  </thead>
-  <tbody>
-   <tr>
-      <td>Start new chain</td>
-      <td>
-        <p>Begin a new chain, starting with this operator. The two
-       mappers will be chained, and filter will not be chained to
-       the first mapper.
-{% highlight java %}
-someStream.filter(...).map(...).startNewChain().map(...);
-{% endhighlight %}
-        </p>
-      </td>
-    </tr>
-   <tr>
-      <td>Disable chaining</td>
-      <td>
-        <p>Do not chain the map operator
-{% highlight java %}
-someStream.map(...).disableChaining();
-{% endhighlight %}
-        </p>
-      </td>
-    </tr>
-    <tr>
-      <td>Set slot sharing group</td>
-      <td>
-        <p>Set the slot sharing group of an operation. Flink will put 
operations with the same
-        slot sharing group into the same slot while keeping operations that 
don't have the
-        slot sharing group in other slots. This can be used to isolate slots. 
The slot sharing
-        group is inherited from input operations if all input operations are 
in the same slot
-        sharing group.
-        The name of the default slot sharing group is "default", operations 
can explicitly
-        be put into this group by calling slotSharingGroup("default").
-{% highlight java %}
-someStream.filter(...).slotSharingGroup("name");
-{% endhighlight %}
-        </p>
-      </td>
-    </tr>
-  </tbody>
-</table>
-
-</div>
-
-<div data-lang="scala" markdown="1">
-
-<br />
-
-<table class="table table-bordered">
-  <thead>
-    <tr>
-      <th class="text-left" style="width: 20%">Transformation</th>
-      <th class="text-center">Description</th>
-    </tr>
-  </thead>
-  <tbody>
-   <tr>
-      <td>Start new chain</td>
-      <td>
-        <p>Begin a new chain, starting with this operator. The two
-       mappers will be chained, and filter will not be chained to
-       the first mapper.
-{% highlight scala %}
-someStream.filter(...).map(...).startNewChain().map(...)
-{% endhighlight %}
-        </p>
-      </td>
-    </tr>
-   <tr>
-      <td>Disable chaining</td>
-      <td>
-        <p>Do not chain the map operator
-{% highlight scala %}
-someStream.map(...).disableChaining()
-{% endhighlight %}
-        </p>
-      </td>
-    </tr>
-  <tr>
-      <td>Set slot sharing group</td>
-      <td>
-        <p>Set the slot sharing group of an operation. Flink will put 
operations with the same
-        slot sharing group into the same slot while keeping operations that 
don't have the
-        slot sharing group in other slots. This can be used to isolate slots. 
The slot sharing
-        group is inherited from input operations if all input operations are 
in the same slot
-        sharing group.
-        The name of the default slot sharing group is "default", operations 
can explicitly
-        be put into this group by calling slotSharingGroup("default").
-{% highlight java %}
-someStream.filter(...).slotSharingGroup("name")
-{% endhighlight %}
-        </p>
-      </td>
-    </tr>
-  </tbody>
-</table>
-
-</div>
-</div>
-
-
-{% top %}
+Moved. Please see [operators]({{ site.baseurl }}/dev/stream/operators.html) 
for an overview of the
+available stream transformations.
 
 Data Sources
 ------------

http://git-wip-us.apache.org/repos/asf/flink/blob/cafa45e2/docs/dev/stream/asyncio.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/asyncio.md b/docs/dev/stream/asyncio.md
index c4414b4..ec9c8ba 100644
--- a/docs/dev/stream/asyncio.md
+++ b/docs/dev/stream/asyncio.md
@@ -1,7 +1,7 @@
 ---
 title: "Asynchronous I/O for External Data Access"
 nav-title: "Async I/O"
-nav-parent_id: streaming
+nav-parent_id: operators
 nav-pos: 60
 ---
 <!--

http://git-wip-us.apache.org/repos/asf/flink/blob/cafa45e2/docs/dev/stream/operators.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/operators.md b/docs/dev/stream/operators.md
new file mode 100644
index 0000000..70bd9ae
--- /dev/null
+++ b/docs/dev/stream/operators.md
@@ -0,0 +1,1169 @@
+---
+title: "Operators"
+nav-id: operators
+nav-show_overview: true
+nav-parent_id: streaming
+nav-pos: 9
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+Operators transform one or more DataStreams into a new DataStream. Programs 
can combine
+multiple transformations into sophisticated topologies.
+
+This section gives a description of all the available transformations, the 
effective physical
+partitioning after applying those as well as insights into Flink's operator 
chaining.
+
+* toc
+{:toc}
+
+# DataStream Transformations
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+<br />
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 25%">Transformation</th>
+      <th class="text-center">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+          <td><strong>Map</strong><br>DataStream &rarr; DataStream</td>
+          <td>
+            <p>Takes one element and produces one element. A map function that 
doubles the values of the input stream:</p>
+    {% highlight java %}
+DataStream<Integer> dataStream = //...
+dataStream.map(new MapFunction<Integer, Integer>() {
+    @Override
+    public Integer map(Integer value) throws Exception {
+        return 2 * value;
+    }
+});
+    {% endhighlight %}
+          </td>
+        </tr>
+
+        <tr>
+          <td><strong>FlatMap</strong><br>DataStream &rarr; DataStream</td>
+          <td>
+            <p>Takes one element and produces zero, one, or more elements. A 
flatmap function that splits sentences to words:</p>
+    {% highlight java %}
+dataStream.flatMap(new FlatMapFunction<String, String>() {
+    @Override
+    public void flatMap(String value, Collector<String> out)
+        throws Exception {
+        for(String word: value.split(" ")){
+            out.collect(word);
+        }
+    }
+});
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Filter</strong><br>DataStream &rarr; DataStream</td>
+          <td>
+            <p>Evaluates a boolean function for each element and retains those 
for which the function returns true.
+            A filter that filters out zero values:
+            </p>
+    {% highlight java %}
+dataStream.filter(new FilterFunction<Integer>() {
+    @Override
+    public boolean filter(Integer value) throws Exception {
+        return value != 0;
+    }
+});
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>KeyBy</strong><br>DataStream &rarr; KeyedStream</td>
+          <td>
+            <p>Logically partitions a stream into disjoint partitions, each 
partition containing elements of the same key.
+            Internally, this is implemented with hash partitioning. See <a 
href="{{ site.baseurl }}/dev/api_concepts.html#specifying-keys">keys</a> on how 
to specify keys.
+            This transformation returns a KeyedStream.</p>
+    {% highlight java %}
+dataStream.keyBy("someKey") // Key by field "someKey"
+dataStream.keyBy(0) // Key by the first element of a Tuple
+    {% endhighlight %}
+            <p>
+            <span class="label label-danger">Attention</span>
+            A type <strong>cannot be a key</strong> if:
+           <ol>
+           <li> it is a POJO type but does not override the 
<em>hashCode()</em> method and
+           relies on the <em>Object.hashCode()</em> implementation.</li>
+           <li> it is an array of any type.</li>
+           </ol>
+           </p>
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Reduce</strong><br>KeyedStream &rarr; DataStream</td>
+          <td>
+            <p>A "rolling" reduce on a keyed data stream. Combines the current 
element with the last reduced value and
+            emits the new value.
+                    <br/>
+               <br/>
+            A reduce function that creates a stream of partial sums:</p>
+            {% highlight java %}
+keyedStream.reduce(new ReduceFunction<Integer>() {
+    @Override
+    public Integer reduce(Integer value1, Integer value2)
+    throws Exception {
+        return value1 + value2;
+    }
+});
+            {% endhighlight %}
+            </p>
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Fold</strong><br>KeyedStream &rarr; DataStream</td>
+          <td>
+          <p>A "rolling" fold on a keyed data stream with an initial value.
+          Combines the current element with the last folded value and
+          emits the new value.
+          <br/>
+          <br/>
+          <p>A fold function that, when applied on the sequence (1,2,3,4,5),
+          emits the sequence "start-1", "start-1-2", "start-1-2-3", ...</p>
+          {% highlight java %}
+DataStream<String> result =
+  keyedStream.fold("start", new FoldFunction<Integer, String>() {
+    @Override
+    public String fold(String current, Integer value) {
+        return current + "-" + value;
+    }
+  });
+          {% endhighlight %}
+          </p>
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Aggregations</strong><br>KeyedStream &rarr; 
DataStream</td>
+          <td>
+            <p>Rolling aggregations on a keyed data stream. The difference 
between min
+           and minBy is that min returns the minimum value, whereas minBy 
returns
+           the element that has the minimum value in this field (same for max 
and maxBy).</p>
+    {% highlight java %}
+keyedStream.sum(0);
+keyedStream.sum("key");
+keyedStream.min(0);
+keyedStream.min("key");
+keyedStream.max(0);
+keyedStream.max("key");
+keyedStream.minBy(0);
+keyedStream.minBy("key");
+keyedStream.maxBy(0);
+keyedStream.maxBy("key");
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Window</strong><br>KeyedStream &rarr; WindowedStream</td>
+          <td>
+            <p>Windows can be defined on already partitioned KeyedStreams. 
Windows group the data in each
+            key according to some characteristic (e.g., the data that arrived 
within the last 5 seconds).
+            See <a href="windows.html">windows</a> for a complete description 
of windows.
+    {% highlight java %}
+dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))); // 
Last 5 seconds of data
+    {% endhighlight %}
+        </p>
+          </td>
+        </tr>
+        <tr>
+          <td><strong>WindowAll</strong><br>DataStream &rarr; 
AllWindowedStream</td>
+          <td>
+              <p>Windows can be defined on regular DataStreams. Windows group 
all the stream events
+              according to some characteristic (e.g., the data that arrived 
within the last 5 seconds).
+              See <a href="windows.html">windows</a> for a complete 
description of windows.</p>
+              <p><strong>WARNING:</strong> This is in many cases a 
<strong>non-parallel</strong> transformation. All records will be
+               gathered in one task for the windowAll operator.</p>
+  {% highlight java %}
+dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 
seconds of data
+  {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Window Apply</strong><br>WindowedStream &rarr; 
DataStream<br>AllWindowedStream &rarr; DataStream</td>
+          <td>
+            <p>Applies a general function to the window as a whole. Below is a 
function that manually sums the elements of a window.</p>
+            <p><strong>Note:</strong> If you are using a windowAll 
transformation, you need to use an AllWindowFunction instead.</p>
+    {% highlight java %}
+windowedStream.apply (new WindowFunction<Tuple2<String,Integer>, Integer, 
Tuple, Window>() {
+    public void apply (Tuple tuple,
+            Window window,
+            Iterable<Tuple2<String, Integer>> values,
+            Collector<Integer> out) throws Exception {
+        int sum = 0;
+        for (value t: values) {
+            sum += t.f1;
+        }
+        out.collect (new Integer(sum));
+    }
+});
+
+// applying an AllWindowFunction on non-keyed window stream
+allWindowedStream.apply (new AllWindowFunction<Tuple2<String,Integer>, 
Integer, Window>() {
+    public void apply (Window window,
+            Iterable<Tuple2<String, Integer>> values,
+            Collector<Integer> out) throws Exception {
+        int sum = 0;
+        for (value t: values) {
+            sum += t.f1;
+        }
+        out.collect (new Integer(sum));
+    }
+});
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Window Reduce</strong><br>WindowedStream &rarr; 
DataStream</td>
+          <td>
+            <p>Applies a functional reduce function to the window and returns 
the reduced value.</p>
+    {% highlight java %}
+windowedStream.reduce (new ReduceFunction<Tuple2<String,Integer>>() {
+    public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, 
Tuple2<String, Integer> value2) throws Exception {
+        return new Tuple2<String,Integer>(value1.f0, value1.f1 + value2.f1);
+    }
+});
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Window Fold</strong><br>WindowedStream &rarr; 
DataStream</td>
+          <td>
+            <p>Applies a functional fold function to the window and returns 
the folded value.
+               The example function, when applied on the sequence (1,2,3,4,5),
+               folds the sequence into the string "start-1-2-3-4-5":</p>
+    {% highlight java %}
+windowedStream.fold("start", new FoldFunction<Integer, String>() {
+    public String fold(String current, Integer value) {
+        return current + "-" + value;
+    }
+});
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Aggregations on windows</strong><br>WindowedStream 
&rarr; DataStream</td>
+          <td>
+            <p>Aggregates the contents of a window. The difference between min
+           and minBy is that min returns the minimun value, whereas minBy 
returns
+           the element that has the minimum value in this field (same for max 
and maxBy).</p>
+    {% highlight java %}
+windowedStream.sum(0);
+windowedStream.sum("key");
+windowedStream.min(0);
+windowedStream.min("key");
+windowedStream.max(0);
+windowedStream.max("key");
+windowedStream.minBy(0);
+windowedStream.minBy("key");
+windowedStream.maxBy(0);
+windowedStream.maxBy("key");
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Union</strong><br>DataStream* &rarr; DataStream</td>
+          <td>
+            <p>Union of two or more data streams creating a new stream 
containing all the elements from all the streams. Note: If you union a data 
stream
+            with itself you will get each element twice in the resulting 
stream.</p>
+    {% highlight java %}
+dataStream.union(otherStream1, otherStream2, ...);
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Window Join</strong><br>DataStream,DataStream &rarr; 
DataStream</td>
+          <td>
+            <p>Join two data streams on a given key and a common window.</p>
+    {% highlight java %}
+dataStream.join(otherStream)
+    .where(<key selector>).equalTo(<key selector>)
+    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
+    .apply (new JoinFunction () {...});
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Window CoGroup</strong><br>DataStream,DataStream &rarr; 
DataStream</td>
+          <td>
+            <p>Cogroups two data streams on a given key and a common 
window.</p>
+    {% highlight java %}
+dataStream.coGroup(otherStream)
+    .where(0).equalTo(1)
+    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
+    .apply (new CoGroupFunction () {...});
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Connect</strong><br>DataStream,DataStream &rarr; 
ConnectedStreams</td>
+          <td>
+            <p>"Connects" two data streams retaining their types. Connect 
allowing for shared state between
+            the two streams.</p>
+    {% highlight java %}
+DataStream<Integer> someStream = //...
+DataStream<String> otherStream = //...
+
+ConnectedStreams<Integer, String> connectedStreams = 
someStream.connect(otherStream);
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>CoMap, CoFlatMap</strong><br>ConnectedStreams &rarr; 
DataStream</td>
+          <td>
+            <p>Similar to map and flatMap on a connected data stream</p>
+    {% highlight java %}
+connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() {
+    @Override
+    public Boolean map1(Integer value) {
+        return true;
+    }
+
+    @Override
+    public Boolean map2(String value) {
+        return false;
+    }
+});
+connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() {
+
+   @Override
+   public void flatMap1(Integer value, Collector<String> out) {
+       out.collect(value.toString());
+   }
+
+   @Override
+   public void flatMap2(String value, Collector<String> out) {
+       for (String word: value.split(" ")) {
+         out.collect(word);
+       }
+   }
+});
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Split</strong><br>DataStream &rarr; SplitStream</td>
+          <td>
+            <p>
+                Split the stream into two or more streams according to some 
criterion.
+                {% highlight java %}
+SplitStream<Integer> split = someDataStream.split(new 
OutputSelector<Integer>() {
+    @Override
+    public Iterable<String> select(Integer value) {
+        List<String> output = new ArrayList<String>();
+        if (value % 2 == 0) {
+            output.add("even");
+        }
+        else {
+            output.add("odd");
+        }
+        return output;
+    }
+});
+                {% endhighlight %}
+            </p>
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Select</strong><br>SplitStream &rarr; DataStream</td>
+          <td>
+            <p>
+                Select one or more streams from a split stream.
+                {% highlight java %}
+SplitStream<Integer> split;
+DataStream<Integer> even = split.select("even");
+DataStream<Integer> odd = split.select("odd");
+DataStream<Integer> all = split.select("even","odd");
+                {% endhighlight %}
+            </p>
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Iterate</strong><br>DataStream &rarr; IterativeStream 
&rarr; DataStream</td>
+          <td>
+            <p>
+                Creates a "feedback" loop in the flow, by redirecting the 
output of one operator
+                to some previous operator. This is especially useful for 
defining algorithms that
+                continuously update a model. The following code starts with a 
stream and applies
+               the iteration body continuously. Elements that are greater than 
0 are sent back
+               to the feedback channel, and the rest of the elements are 
forwarded downstream.
+               See <a href="#iterations">iterations</a> for a complete 
description.
+                {% highlight java %}
+IterativeStream<Long> iteration = initialStream.iterate();
+DataStream<Long> iterationBody = iteration.map (/*do something*/);
+DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>(){
+    @Override
+    public boolean filter(Integer value) throws Exception {
+        return value > 0;
+    }
+});
+iteration.closeWith(feedback);
+DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>(){
+    @Override
+    public boolean filter(Integer value) throws Exception {
+        return value <= 0;
+    }
+});
+                {% endhighlight %}
+            </p>
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Extract Timestamps</strong><br>DataStream &rarr; 
DataStream</td>
+          <td>
+            <p>
+                Extracts timestamps from records in order to work with windows
+                that use event time semantics. See <a href="{{ site.baseurl 
}}/dev/event_time.html">Event Time</a>.
+                {% highlight java %}
+stream.assignTimestamps (new TimeStampExtractor() {...});
+                {% endhighlight %}
+            </p>
+          </td>
+        </tr>
+  </tbody>
+</table>
+
+</div>
+
+<div data-lang="scala" markdown="1">
+
+<br />
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 25%">Transformation</th>
+      <th class="text-center">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+          <td><strong>Map</strong><br>DataStream &rarr; DataStream</td>
+          <td>
+            <p>Takes one element and produces one element. A map function that 
doubles the values of the input stream:</p>
+    {% highlight scala %}
+dataStream.map { x => x * 2 }
+    {% endhighlight %}
+          </td>
+        </tr>
+
+        <tr>
+          <td><strong>FlatMap</strong><br>DataStream &rarr; DataStream</td>
+          <td>
+            <p>Takes one element and produces zero, one, or more elements. A 
flatmap function that splits sentences to words:</p>
+    {% highlight scala %}
+dataStream.flatMap { str => str.split(" ") }
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Filter</strong><br>DataStream &rarr; DataStream</td>
+          <td>
+            <p>Evaluates a boolean function for each element and retains those 
for which the function returns true.
+            A filter that filters out zero values:
+            </p>
+    {% highlight scala %}
+dataStream.filter { _ != 0 }
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>KeyBy</strong><br>DataStream &rarr; KeyedStream</td>
+          <td>
+            <p>Logically partitions a stream into disjoint partitions, each 
partition containing elements of the same key.
+            Internally, this is implemented with hash partitioning. See <a 
href="{{ site.baseurl }}/dev/api_concepts.html#specifying-keys">keys</a> on how 
to specify keys.
+            This transformation returns a KeyedStream.</p>
+    {% highlight scala %}
+dataStream.keyBy("someKey") // Key by field "someKey"
+dataStream.keyBy(0) // Key by the first element of a Tuple
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Reduce</strong><br>KeyedStream &rarr; DataStream</td>
+          <td>
+            <p>A "rolling" reduce on a keyed data stream. Combines the current 
element with the last reduced value and
+            emits the new value.
+                    <br/>
+               <br/>
+            A reduce function that creates a stream of partial sums:</p>
+            {% highlight scala %}
+keyedStream.reduce { _ + _ }
+            {% endhighlight %}
+            </p>
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Fold</strong><br>KeyedStream &rarr; DataStream</td>
+          <td>
+          <p>A "rolling" fold on a keyed data stream with an initial value.
+          Combines the current element with the last folded value and
+          emits the new value.
+          <br/>
+          <br/>
+          <p>A fold function that, when applied on the sequence (1,2,3,4,5),
+          emits the sequence "start-1", "start-1-2", "start-1-2-3", ...</p>
+          {% highlight scala %}
+val result: DataStream[String] =
+    keyedStream.fold("start")((str, i) => { str + "-" + i })
+          {% endhighlight %}
+          </p>
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Aggregations</strong><br>KeyedStream &rarr; 
DataStream</td>
+          <td>
+            <p>Rolling aggregations on a keyed data stream. The difference 
between min
+           and minBy is that min returns the minimun value, whereas minBy 
returns
+           the element that has the minimum value in this field (same for max 
and maxBy).</p>
+    {% highlight scala %}
+keyedStream.sum(0)
+keyedStream.sum("key")
+keyedStream.min(0)
+keyedStream.min("key")
+keyedStream.max(0)
+keyedStream.max("key")
+keyedStream.minBy(0)
+keyedStream.minBy("key")
+keyedStream.maxBy(0)
+keyedStream.maxBy("key")
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Window</strong><br>KeyedStream &rarr; WindowedStream</td>
+          <td>
+            <p>Windows can be defined on already partitioned KeyedStreams. 
Windows group the data in each
+            key according to some characteristic (e.g., the data that arrived 
within the last 5 seconds).
+            See <a href="windows.html">windows</a> for a description of 
windows.
+    {% highlight scala %}
+dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))) // 
Last 5 seconds of data
+    {% endhighlight %}
+        </p>
+          </td>
+        </tr>
+        <tr>
+          <td><strong>WindowAll</strong><br>DataStream &rarr; 
AllWindowedStream</td>
+          <td>
+              <p>Windows can be defined on regular DataStreams. Windows group 
all the stream events
+              according to some characteristic (e.g., the data that arrived 
within the last 5 seconds).
+              See <a href="windows.html">windows</a> for a complete 
description of windows.</p>
+              <p><strong>WARNING:</strong> This is in many cases a 
<strong>non-parallel</strong> transformation. All records will be
+               gathered in one task for the windowAll operator.</p>
+  {% highlight scala %}
+dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))) // Last 5 
seconds of data
+  {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Window Apply</strong><br>WindowedStream &rarr; 
DataStream<br>AllWindowedStream &rarr; DataStream</td>
+          <td>
+            <p>Applies a general function to the window as a whole. Below is a 
function that manually sums the elements of a window.</p>
+            <p><strong>Note:</strong> If you are using a windowAll 
transformation, you need to use an AllWindowFunction instead.</p>
+    {% highlight scala %}
+windowedStream.apply { WindowFunction }
+
+// applying an AllWindowFunction on non-keyed window stream
+allWindowedStream.apply { AllWindowFunction }
+
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Window Reduce</strong><br>WindowedStream &rarr; 
DataStream</td>
+          <td>
+            <p>Applies a functional reduce function to the window and returns 
the reduced value.</p>
+    {% highlight scala %}
+windowedStream.reduce { _ + _ }
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Window Fold</strong><br>WindowedStream &rarr; 
DataStream</td>
+          <td>
+            <p>Applies a functional fold function to the window and returns 
the folded value.
+               The example function, when applied on the sequence (1,2,3,4,5),
+               folds the sequence into the string "start-1-2-3-4-5":</p>
+          {% highlight scala %}
+val result: DataStream[String] =
+    windowedStream.fold("start", (str, i) => { str + "-" + i })
+          {% endhighlight %}
+          </td>
+       </tr>
+        <tr>
+          <td><strong>Aggregations on windows</strong><br>WindowedStream 
&rarr; DataStream</td>
+          <td>
+            <p>Aggregates the contents of a window. The difference between min
+           and minBy is that min returns the minimum value, whereas minBy 
returns
+           the element that has the minimum value in this field (same for max 
and maxBy).</p>
+    {% highlight scala %}
+windowedStream.sum(0)
+windowedStream.sum("key")
+windowedStream.min(0)
+windowedStream.min("key")
+windowedStream.max(0)
+windowedStream.max("key")
+windowedStream.minBy(0)
+windowedStream.minBy("key")
+windowedStream.maxBy(0)
+windowedStream.maxBy("key")
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Union</strong><br>DataStream* &rarr; DataStream</td>
+          <td>
+            <p>Union of two or more data streams creating a new stream 
containing all the elements from all the streams. Note: If you union a data 
stream
+            with itself you will get each element twice in the resulting 
stream.</p>
+    {% highlight scala %}
+dataStream.union(otherStream1, otherStream2, ...)
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Window Join</strong><br>DataStream,DataStream &rarr; 
DataStream</td>
+          <td>
+            <p>Join two data streams on a given key and a common window.</p>
+    {% highlight scala %}
+dataStream.join(otherStream)
+    .where(<key selector>).equalTo(<key selector>)
+    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
+    .apply { ... }
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Window CoGroup</strong><br>DataStream,DataStream &rarr; 
DataStream</td>
+          <td>
+            <p>Cogroups two data streams on a given key and a common 
window.</p>
+    {% highlight scala %}
+dataStream.coGroup(otherStream)
+    .where(0).equalTo(1)
+    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
+    .apply {}
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Connect</strong><br>DataStream,DataStream &rarr; 
ConnectedStreams</td>
+          <td>
+            <p>"Connects" two data streams retaining their types, allowing for 
shared state between
+            the two streams.</p>
+    {% highlight scala %}
+someStream : DataStream[Int] = ...
+otherStream : DataStream[String] = ...
+
+val connectedStreams = someStream.connect(otherStream)
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>CoMap, CoFlatMap</strong><br>ConnectedStreams &rarr; 
DataStream</td>
+          <td>
+            <p>Similar to map and flatMap on a connected data stream</p>
+    {% highlight scala %}
+connectedStreams.map(
+    (_ : Int) => true,
+    (_ : String) => false
+)
+connectedStreams.flatMap(
+    (_ : Int) => true,
+    (_ : String) => false
+)
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Split</strong><br>DataStream &rarr; SplitStream</td>
+          <td>
+            <p>
+                Split the stream into two or more streams according to some 
criterion.
+                {% highlight scala %}
+val split = someDataStream.split(
+  (num: Int) =>
+    (num % 2) match {
+      case 0 => List("even")
+      case 1 => List("odd")
+    }
+)
+                {% endhighlight %}
+            </p>
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Select</strong><br>SplitStream &rarr; DataStream</td>
+          <td>
+            <p>
+                Select one or more streams from a split stream.
+                {% highlight scala %}
+
+val even = split select "even"
+val odd = split select "odd"
+val all = split.select("even","odd")
+                {% endhighlight %}
+            </p>
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Iterate</strong><br>DataStream &rarr; IterativeStream  
&rarr; DataStream</td>
+          <td>
+            <p>
+                Creates a "feedback" loop in the flow, by redirecting the 
output of one operator
+                to some previous operator. This is especially useful for 
defining algorithms that
+                continuously update a model. The following code starts with a 
stream and applies
+               the iteration body continuously. Elements that are greater than 
0 are sent back
+               to the feedback channel, and the rest of the elements are 
forwarded downstream.
+               See <a href="#iterations">iterations</a> for a complete 
description.
+                {% highlight java %}
+initialStream.iterate {
+  iteration => {
+    val iterationBody = iteration.map {/*do something*/}
+    (iterationBody.filter(_ > 0), iterationBody.filter(_ <= 0))
+  }
+}
+                {% endhighlight %}
+            </p>
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Extract Timestamps</strong><br>DataStream &rarr; 
DataStream</td>
+          <td>
+            <p>
+                Extracts timestamps from records in order to work with windows
+                that use event time semantics.
+                See <a href="{{ site.baseurl 
}}/apis/streaming/event_time.html">Event Time</a>.
+                {% highlight scala %}
+stream.assignTimestamps { timestampExtractor }
+                {% endhighlight %}
+            </p>
+          </td>
+        </tr>
+  </tbody>
+</table>
+
+Extraction from tuples, case classes and collections via anonymous pattern 
matching, like the following:
+{% highlight scala %}
+val data: DataStream[(Int, String, Double)] = // [...]
+data.map {
+  case (id, name, temperature) => // [...]
+}
+{% endhighlight %}
+is not supported by the API out-of-the-box. To use this feature, you should 
use a <a href="scala_api_extensions.html">Scala API extension</a>.
+
+
+</div>
+</div>
+
+The following transformations are available on data streams of Tuples:
+
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+<br />
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 20%">Transformation</th>
+      <th class="text-center">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+   <tr>
+      <td><strong>Project</strong><br>DataStream &rarr; DataStream</td>
+      <td>
+        <p>Selects a subset of fields from the tuples
+{% highlight java %}
+DataStream<Tuple3<Integer, Double, String>> in = // [...]
+DataStream<Tuple2<String, Integer>> out = in.project(2,0);
+{% endhighlight %}
+        </p>
+      </td>
+    </tr>
+  </tbody>
+</table>
+
+</div>
+</div>
+
+
+# Physical partitioning
+
+Flink also gives low-level control (if desired) on the exact stream 
partitioning after a transformation,
+via the following functions.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+<br />
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 20%">Transformation</th>
+      <th class="text-center">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+   <tr>
+      <td><strong>Custom partitioning</strong><br>DataStream &rarr; 
DataStream</td>
+      <td>
+        <p>
+            Uses a user-defined Partitioner to select the target task for each 
element.
+            {% highlight java %}
+dataStream.partitionCustom(partitioner, "someKey");
+dataStream.partitionCustom(partitioner, 0);
+            {% endhighlight %}
+        </p>
+      </td>
+    </tr>
+   <tr>
+     <td><strong>Random partitioning</strong><br>DataStream &rarr; 
DataStream</td>
+     <td>
+       <p>
+            Partitions elements randomly according to a uniform distribution.
+            {% highlight java %}
+dataStream.shuffle();
+            {% endhighlight %}
+       </p>
+     </td>
+   </tr>
+   <tr>
+      <td><strong>Rebalancing (Round-robin 
partitioning)</strong><br>DataStream &rarr; DataStream</td>
+      <td>
+        <p>
+            Partitions elements round-robin, creating equal load per 
partition. Useful for performance
+            optimization in the presence of data skew.
+            {% highlight java %}
+dataStream.rebalance();
+            {% endhighlight %}
+        </p>
+      </td>
+    </tr>
+    <tr>
+      <td><strong>Rescaling</strong><br>DataStream &rarr; DataStream</td>
+      <td>
+        <p>
+            Partitions elements, round-robin, to a subset of downstream 
operations. This is
+            useful if you want to have pipelines where you, for example, fan 
out from
+            each parallel instance of a source to a subset of several mappers 
to distribute load
+            but don't want the full rebalance that rebalance() would incur. 
This would require only
+            local data transfers instead of transferring data over network, 
depending on
+            other configuration values such as the number of slots of 
TaskManagers.
+        </p>
+        <p>
+            The subset of downstream operations to which the upstream 
operation sends
+            elements depends on the degree of parallelism of both the upstream 
and downstream operation.
+            For example, if the upstream operation has parallelism 2 and the 
downstream operation
+            has parallelism 6, then one upstream operation would distribute 
elements to three
+            downstream operations while the other upstream operation would 
distribute to the other
+            three downstream operations. If, on the other hand, the downstream 
operation has parallelism
+            2 while the upstream operation has parallelism 6 then three 
upstream operations would
+            distribute to one downstream operation while the other three 
upstream operations would
+            distribute to the other downstream operation.
+        </p>
+        <p>
+            In cases where the different parallelisms are not multiples of 
each other one or several
+            downstream operations will have a differing number of inputs from 
upstream operations.
+        </p>
+        <p>
+            Please see this figure for a visualization of the connection 
pattern in the above
+            example:
+        </p>
+
+        <div style="text-align: center">
+            <img src="{{ site.baseurl }}/fig/rescale.svg" alt="Checkpoint 
barriers in data streams" />
+            </div>
+
+
+        <p>
+                    {% highlight java %}
+dataStream.rescale();
+            {% endhighlight %}
+
+        </p>
+      </td>
+    </tr>
+   <tr>
+      <td><strong>Broadcasting</strong><br>DataStream &rarr; DataStream</td>
+      <td>
+        <p>
+            Broadcasts elements to every partition.
+            {% highlight java %}
+dataStream.broadcast();
+            {% endhighlight %}
+        </p>
+      </td>
+    </tr>
+  </tbody>
+</table>
+
+</div>
+
+<div data-lang="scala" markdown="1">
+
+<br />
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 20%">Transformation</th>
+      <th class="text-center">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+   <tr>
+      <td><strong>Custom partitioning</strong><br>DataStream &rarr; 
DataStream</td>
+      <td>
+        <p>
+            Uses a user-defined Partitioner to select the target task for each 
element.
+            {% highlight scala %}
+dataStream.partitionCustom(partitioner, "someKey")
+dataStream.partitionCustom(partitioner, 0)
+            {% endhighlight %}
+        </p>
+      </td>
+    </tr>
+   <tr>
+     <td><strong>Random partitioning</strong><br>DataStream &rarr; 
DataStream</td>
+     <td>
+       <p>
+            Partitions elements randomly according to a uniform distribution.
+            {% highlight scala %}
+dataStream.shuffle()
+            {% endhighlight %}
+       </p>
+     </td>
+   </tr>
+   <tr>
+      <td><strong>Rebalancing (Round-robin 
partitioning)</strong><br>DataStream &rarr; DataStream</td>
+      <td>
+        <p>
+            Partitions elements round-robin, creating equal load per 
partition. Useful for performance
+            optimization in the presence of data skew.
+            {% highlight scala %}
+dataStream.rebalance()
+            {% endhighlight %}
+        </p>
+      </td>
+    </tr>
+    <tr>
+      <td><strong>Rescaling</strong><br>DataStream &rarr; DataStream</td>
+      <td>
+        <p>
+            Partitions elements, round-robin, to a subset of downstream 
operations. This is
+            useful if you want to have pipelines where you, for example, fan 
out from
+            each parallel instance of a source to a subset of several mappers 
to distribute load
+            but don't want the full rebalance that rebalance() would incur. 
This would require only
+            local data transfers instead of transferring data over network, 
depending on
+            other configuration values such as the number of slots of 
TaskManagers.
+        </p>
+        <p>
+            The subset of downstream operations to which the upstream 
operation sends
+            elements depends on the degree of parallelism of both the upstream 
and downstream operation.
+            For example, if the upstream operation has parallelism 2 and the 
downstream operation
+            has parallelism 4, then one upstream operation would distribute 
elements to two
+            downstream operations while the other upstream operation would 
distribute to the other
+            two downstream operations. If, on the other hand, the downstream 
operation has parallelism
+            2 while the upstream operation has parallelism 4 then two upstream 
operations would
+            distribute to one downstream operation while the other two 
upstream operations would
+            distribute to the other downstream operations.
+        </p>
+        <p>
+            In cases where the different parallelisms are not multiples of 
each other one or several
+            downstream operations will have a differing number of inputs from 
upstream operations.
+
+        </p>
+        </p>
+            Please see this figure for a visualization of the connection 
pattern in the above
+            example:
+        </p>
+
+        <div style="text-align: center">
+            <img src="{{ site.baseurl }}/fig/rescale.svg" alt="Checkpoint 
barriers in data streams" />
+            </div>
+
+
+        <p>
+                    {% highlight java %}
+dataStream.rescale()
+            {% endhighlight %}
+
+        </p>
+      </td>
+    </tr>
+   <tr>
+      <td><strong>Broadcasting</strong><br>DataStream &rarr; DataStream</td>
+      <td>
+        <p>
+            Broadcasts elements to every partition.
+            {% highlight scala %}
+dataStream.broadcast()
+            {% endhighlight %}
+        </p>
+      </td>
+    </tr>
+  </tbody>
+</table>
+
+</div>
+</div>
+
+# Task chaining and resource groups
+
+Chaining two subsequent transformations means co-locating them within the same 
thread for better
+performance. Flink by default chains operators if this is possible (e.g., two 
subsequent map
+transformations). The API gives fine-grained control over chaining if desired:
+
+Use `StreamExecutionEnvironment.disableOperatorChaining()` if you want to 
disable chaining in
+the whole job. For more fine grained control, the following functions are 
available. Note that
+these functions can only be used right after a DataStream transformation as 
they refer to the
+previous transformation. For example, you can use 
`someStream.map(...).startNewChain()`, but
+you cannot use `someStream.startNewChain()`.
+
+A resource group is a slot in Flink, see
+[slots]({{site.baseurl}}/setup/config.html#configuring-taskmanager-processing-slots).
 You can
+manually isolate operators in separate slots if desired.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+<br />
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 20%">Transformation</th>
+      <th class="text-center">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+   <tr>
+      <td>Start new chain</td>
+      <td>
+        <p>Begin a new chain, starting with this operator. The two
+       mappers will be chained, and filter will not be chained to
+       the first mapper.
+{% highlight java %}
+someStream.filter(...).map(...).startNewChain().map(...);
+{% endhighlight %}
+        </p>
+      </td>
+    </tr>
+   <tr>
+      <td>Disable chaining</td>
+      <td>
+        <p>Do not chain the map operator
+{% highlight java %}
+someStream.map(...).disableChaining();
+{% endhighlight %}
+        </p>
+      </td>
+    </tr>
+    <tr>
+      <td>Set slot sharing group</td>
+      <td>
+        <p>Set the slot sharing group of an operation. Flink will put 
operations with the same
+        slot sharing group into the same slot while keeping operations that 
don't have the
+        slot sharing group in other slots. This can be used to isolate slots. 
The slot sharing
+        group is inherited from input operations if all input operations are 
in the same slot
+        sharing group.
+        The name of the default slot sharing group is "default", operations 
can explicitly
+        be put into this group by calling slotSharingGroup("default").
+{% highlight java %}
+someStream.filter(...).slotSharingGroup("name");
+{% endhighlight %}
+        </p>
+      </td>
+    </tr>
+  </tbody>
+</table>
+
+</div>
+
+<div data-lang="scala" markdown="1">
+
+<br />
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 20%">Transformation</th>
+      <th class="text-center">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+   <tr>
+      <td>Start new chain</td>
+      <td>
+        <p>Begin a new chain, starting with this operator. The two
+       mappers will be chained, and filter will not be chained to
+       the first mapper.
+{% highlight scala %}
+someStream.filter(...).map(...).startNewChain().map(...)
+{% endhighlight %}
+        </p>
+      </td>
+    </tr>
+   <tr>
+      <td>Disable chaining</td>
+      <td>
+        <p>Do not chain the map operator
+{% highlight scala %}
+someStream.map(...).disableChaining()
+{% endhighlight %}
+        </p>
+      </td>
+    </tr>
+  <tr>
+      <td>Set slot sharing group</td>
+      <td>
+        <p>Set the slot sharing group of an operation. Flink will put 
operations with the same
+        slot sharing group into the same slot while keeping operations that 
don't have the
+        slot sharing group in other slots. This can be used to isolate slots. 
The slot sharing
+        group is inherited from input operations if all input operations are 
in the same slot
+        sharing group.
+        The name of the default slot sharing group is "default", operations 
can explicitly
+        be put into this group by calling slotSharingGroup("default").
+{% highlight java %}
+someStream.filter(...).slotSharingGroup("name")
+{% endhighlight %}
+        </p>
+      </td>
+    </tr>
+  </tbody>
+</table>
+
+</div>
+</div>
+
+
+{% top %}
+

Reply via email to