[FLINK-7370] [docs] Relocate files according to new structure

This closes #4477.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/31b86f60
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/31b86f60
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/31b86f60

Branch: refs/heads/master
Commit: 31b86f605ae1d2e3523d10a88e697dc4f05aef30
Parents: cafa45e
Author: twalthr <twal...@apache.org>
Authored: Wed Aug 9 12:21:31 2017 +0200
Committer: twalthr <twal...@apache.org>
Committed: Wed Aug 9 13:56:43 2017 +0200

----------------------------------------------------------------------
 docs/concepts/programming-model.md            |    6 +-
 docs/dev/connectors/index.md                  |    2 +-
 docs/dev/datastream_api.md                    |   25 +-
 docs/dev/event_time.md                        |    2 +-
 docs/dev/event_timestamp_extractors.md        |    2 +-
 docs/dev/stream/asyncio.md                    |  253 -----
 docs/dev/stream/operators.md                  | 1169 --------------------
 docs/dev/stream/operators/asyncio.md          |  253 +++++
 docs/dev/stream/operators/index.md            | 1169 ++++++++++++++++++++
 docs/dev/stream/operators/process_function.md |  238 ++++
 docs/dev/stream/operators/windows.md          | 1039 +++++++++++++++++
 docs/dev/stream/process_function.md           |  238 ----
 docs/dev/stream/side_output.md                |    2 +-
 docs/dev/stream/state/checkpointing.md        |    4 +-
 docs/dev/stream/windows.md                    | 1039 -----------------
 docs/ops/state/checkpoints.md                 |    2 +-
 docs/redirects/windows.md                     |   24 -
 docs/redirects/windows_2.md                   |   24 -
 18 files changed, 2727 insertions(+), 2764 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/31b86f60/docs/concepts/programming-model.md
----------------------------------------------------------------------
diff --git a/docs/concepts/programming-model.md 
b/docs/concepts/programming-model.md
index 926fdd7..cb127c4 100644
--- a/docs/concepts/programming-model.md
+++ b/docs/concepts/programming-model.md
@@ -34,7 +34,7 @@ Flink offers different levels of abstraction to develop 
streaming/batch applicat
 <img src="../fig/levels_of_abstraction.svg" alt="Programming levels of 
abstraction" class="offset" width="80%" />
 
   - The lowest level abstraction simply offers **stateful streaming**. It is 
embedded into the [DataStream API](../dev/datastream_api.html)
-    via the [Process Function](../dev/stream/process_function.html). It allows 
users freely process events from one or more streams,
+    via the [Process Function](../dev/stream/operators/process_function.html). 
It allows users freely process events from one or more streams,
     and use consistent fault tolerant *state*. In addition, users can register 
event time and processing time callbacks,
     allowing programs to realize sophisticated computations.
 
@@ -82,7 +82,7 @@ Often there is a one-to-one correspondence between the 
transformations in the pr
 in the dataflow. Sometimes, however, one transformation may consist of 
multiple transformation operators.
 
 Sources and sinks are documented in the [streaming 
connectors](../dev/connectors/index.html) and [batch 
connectors](../dev/batch/connectors.html) docs.
-Transformations are documented in [DataStream operators]({{ site.baseurl 
}}/dev/stream/operators.html) and [DataSet 
transformations](../dev/batch/dataset_transformations.html).
+Transformations are documented in [DataStream operators]({{ site.baseurl 
}}/dev/stream/operators/index.html) and [DataSet 
transformations](../dev/batch/dataset_transformations.html).
 
 {% top %}
 
@@ -133,7 +133,7 @@ One typically distinguishes different types of windows, 
such as *tumbling window
 <img src="../fig/windows.svg" alt="Time- and Count Windows" class="offset" 
width="80%" />
 
 More window examples can be found in this [blog 
post](https://flink.apache.org/news/2015/12/04/Introducing-windows.html).
-More details are in the [window docs](../dev/windows.html).
+More details are in the [window docs](../dev/stream/operators/windows.html).
 
 {% top %}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/31b86f60/docs/dev/connectors/index.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/index.md b/docs/dev/connectors/index.md
index 00c0853..f3ae039 100644
--- a/docs/dev/connectors/index.md
+++ b/docs/dev/connectors/index.md
@@ -71,7 +71,7 @@ Additional streaming connectors for Flink are being released 
through [Apache Bah
 Using a connector isn't the only way to get data in and out of Flink.
 One common pattern is to query an external database or web service in a `Map` 
or `FlatMap`
 in order to enrich the primary datastream.
-Flink offers an API for [Asynchronous I/O]({{ site.baseurl 
}}/dev/stream/asyncio.html)
+Flink offers an API for [Asynchronous I/O]({{ site.baseurl 
}}/dev/stream/operators/asyncio.html)
 to make it easier to do this kind of enrichment efficiently and robustly.
 
 ### Queryable State

http://git-wip-us.apache.org/repos/asf/flink/blob/31b86f60/docs/dev/datastream_api.md
----------------------------------------------------------------------
diff --git a/docs/dev/datastream_api.md b/docs/dev/datastream_api.md
index b7f02ef..0fc6033 100644
--- a/docs/dev/datastream_api.md
+++ b/docs/dev/datastream_api.md
@@ -38,7 +38,7 @@ to the basic concepts of the Flink API.
 In order to create your own Flink DataStream program, we encourage you to 
start with
 [anatomy of a Flink Program]({{ site.baseurl 
}}/dev/api_concepts.html#anatomy-of-a-flink-program)
 and gradually add your own
-[stream transformations]({{ site.baseurl }}/dev/stream/operators.html). The 
remaining sections act as references for additional
+[stream transformations]({{ site.baseurl }}/dev/stream/operators/index.html). 
The remaining sections act as references for additional
 operations and advanced features.
 
 
@@ -135,12 +135,6 @@ word count program. If you want to see counts greater than 
1, type the same word
 
 {% top %}
 
-DataStream Transformations
---------------------------
-
-Moved. Please see [operators]({{ site.baseurl }}/dev/stream/operators.html) 
for an overview of the
-available stream transformations.
-
 Data Sources
 ------------
 
@@ -264,6 +258,13 @@ Custom:
 
 {% top %}
 
+DataStream Transformations
+--------------------------
+
+Please see [operators]({{ site.baseurl }}/dev/stream/operators/index.html) for 
an overview of the available stream transformations.
+
+{% top %}
+
 Data Sinks
 ----------
 
@@ -624,3 +625,13 @@ val myOutput: Iterator[(String, Int)] = 
DataStreamUtils.collect(myResult.getJava
 </div>
 
 {% top %}
+
+Where to go next?
+-----------------
+
+* [Operators]({{ site.baseurl }}/dev/stream/operators/index.html): 
Specification of available streaming operators.
+* [Event Time]({{ site.baseurl }}/dev/event_time.html): Introduction to 
Flink's notion of time.
+* [State & Fault Tolerance]({{ site.baseurl }}/dev/stream/state/index.html): 
Explanation of how to develop stateful applications.
+* [Connectors]({{ site.baseurl }}/dev/connectors/index.html): Description of 
available input and output connectors.
+
+{% top %}

http://git-wip-us.apache.org/repos/asf/flink/blob/31b86f60/docs/dev/event_time.md
----------------------------------------------------------------------
diff --git a/docs/dev/event_time.md b/docs/dev/event_time.md
index 3e5120b..70a7812 100644
--- a/docs/dev/event_time.md
+++ b/docs/dev/event_time.md
@@ -205,7 +205,7 @@ causes too much delay in the evaluation of the event time 
windows.
 
 For this reason, streaming programs may explicitly expect some *late* 
elements. Late elements are elements that
 arrive after the system's event time clock (as signaled by the watermarks) has 
already passed the time of the late element's
-timestamp. See [Allowed Lateness]({{ site.baseurl 
}}/dev/windows.html#allowed-lateness) for more information on how to work
+timestamp. See [Allowed Lateness]({{ site.baseurl 
}}/dev/stream/operators/windows.html#allowed-lateness) for more information on 
how to work
 with late elements in event time windows.
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/31b86f60/docs/dev/event_timestamp_extractors.md
----------------------------------------------------------------------
diff --git a/docs/dev/event_timestamp_extractors.md 
b/docs/dev/event_timestamp_extractors.md
index 34a27ff..b270491 100644
--- a/docs/dev/event_timestamp_extractors.md
+++ b/docs/dev/event_timestamp_extractors.md
@@ -79,7 +79,7 @@ time for testing. For these cases, Flink provides the 
`BoundedOutOfOrdernessTime
 the `maxOutOfOrderness`, i.e. the maximum amount of time an element is allowed 
to be late before being ignored when computing the
 final result for the given window. Lateness corresponds to the result of `t - 
t_w`, where `t` is the (event-time) timestamp of an
 element, and `t_w` that of the previous watermark. If `lateness > 0` then the 
element is considered late and is, by default, ignored when computing
-the result of the job for its corresponding window. See the documentation 
about [allowed lateness]({{ site.baseurl }}/dev/windows.html#allowed-lateness)
+the result of the job for its corresponding window. See the documentation 
about [allowed lateness]({{ site.baseurl 
}}/dev/stream/operators/windows.html#allowed-lateness)
 for more information about working with late elements.
 
 <div class="codetabs" markdown="1">

http://git-wip-us.apache.org/repos/asf/flink/blob/31b86f60/docs/dev/stream/asyncio.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/asyncio.md b/docs/dev/stream/asyncio.md
deleted file mode 100644
index ec9c8ba..0000000
--- a/docs/dev/stream/asyncio.md
+++ /dev/null
@@ -1,253 +0,0 @@
----
-title: "Asynchronous I/O for External Data Access"
-nav-title: "Async I/O"
-nav-parent_id: operators
-nav-pos: 60
----
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-* ToC
-{:toc}
-
-This page explains the use of Flink's API for asynchronous I/O with external 
data stores.
-For users not familiar with asynchronous or event-driven programming, an 
article about Futures and
-event-driven programming may be useful preparation.
-
-Note: Details about the design and implementation of the asynchronous I/O 
utility can be found in the proposal and design document
-[FLIP-12: Asynchronous I/O Design and 
Implementation](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65870673).
-
-
-## The need for Asynchronous I/O Operations
-
-When interacting with external systems (for example when enriching stream 
events with data stored in a database), one needs to take care
-that communication delay with the external system does not dominate the 
streaming application's total work.
-
-Naively accessing data in the external database, for example in a 
`MapFunction`, typically means **synchronous** interaction:
-A request is sent to the database and the `MapFunction` waits until the 
response has been received. In many cases, this waiting
-makes up the vast majority of the function's time.
-
-Asynchronous interaction with the database means that a single parallel 
function instance can handle many requests concurrently and
-receive the responses concurrently. That way, the waiting time can be 
overlayed with sending other requests and
-receiving responses. At the very least, the waiting time is amortized over 
multiple requests. This leads in most cased to much higher
-streaming throughput.
-
-<img src="../../fig/async_io.svg" class="center" width="50%" />
-
-*Note:* Improving throughput by just scaling the `MapFunction` to a very high 
parallelism is in some cases possible as well, but usually
-comes at a very high resource cost: Having many more parallel MapFunction 
instances means more tasks, threads, Flink-internal network
-connections, network connections to the database, buffers, and general 
internal bookkeeping overhead.
-
-
-## Prerequisites
-
-As illustrated in the section above, implementing proper asynchronous I/O to a 
database (or key/value store) requires a client
-to that database that supports asynchronous requests. Many popular databases 
offer such a client.
-
-In the absence of such a client, one can try and turn a synchronous client 
into a limited concurrent client by creating
-multiple clients and handling the synchronous calls with a thread pool. 
However, this approach is usually less
-efficient than a proper asynchronous client.
-
-
-## Async I/O API
-
-Flink's Async I/O API allows users to use asynchronous request clients with 
data streams. The API handles the integration with
-data streams, well as handling order, event time, fault tolerance, etc.
-
-Assuming one has an asynchronous client for the target database, three parts 
are needed to implement a stream transformation
-with asynchronous I/O against the database:
-
-  - An implementation of `AsyncFunction` that dispatches the requests
-  - A *callback* that takes the result of the operation and hands it to the 
`AsyncCollector`
-  - Applying the async I/O operation on a DataStream as a transformation
-
-The following code example illustrates the basic pattern:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-// This example implements the asynchronous request and callback with Futures 
that have the
-// interface of Java 8's futures (which is the same one followed by Flink's 
Future)
-
-/**
- * An implementation of the 'AsyncFunction' that sends requests and sets the 
callback.
- */
-class AsyncDatabaseRequest extends RichAsyncFunction<String, Tuple2<String, 
String>> {
-
-    /** The database specific client that can issue concurrent requests with 
callbacks */
-    private transient DatabaseClient client;
-
-    @Override
-    public void open(Configuration parameters) throws Exception {
-        client = new DatabaseClient(host, post, credentials);
-    }
-
-    @Override
-    public void close() throws Exception {
-        client.close();
-    }
-
-    @Override
-    public void asyncInvoke(final String str, final 
AsyncCollector<Tuple2<String, String>> asyncCollector) throws Exception {
-
-        // issue the asynchronous request, receive a future for result
-        Future<String> resultFuture = client.query(str);
-
-        // set the callback to be executed once the request by the client is 
complete
-        // the callback simply forwards the result to the collector
-        resultFuture.thenAccept( (String result) -> {
-
-            asyncCollector.collect(Collections.singleton(new Tuple2<>(str, 
result)));
-         
-        });
-    }
-}
-
-// create the original stream
-DataStream<String> stream = ...;
-
-// apply the async I/O transformation
-DataStream<Tuple2<String, String>> resultStream =
-    AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, 
TimeUnit.MILLISECONDS, 100);
-
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-/**
- * An implementation of the 'AsyncFunction' that sends requests and sets the 
callback.
- */
-class AsyncDatabaseRequest extends AsyncFunction[String, (String, String)] {
-
-    /** The database specific client that can issue concurrent requests with 
callbacks */
-    lazy val client: DatabaseClient = new DatabaseClient(host, post, 
credentials)
-
-    /** The context used for the future callbacks */
-    implicit lazy val executor: ExecutionContext = 
ExecutionContext.fromExecutor(Executors.directExecutor())
-
-
-    override def asyncInvoke(str: String, asyncCollector: 
AsyncCollector[(String, String)]): Unit = {
-
-        // issue the asynchronous request, receive a future for the result
-        val resultFuture: Future[String] = client.query(str)
-
-        // set the callback to be executed once the request by the client is 
complete
-        // the callback simply forwards the result to the collector
-        resultFuture.onSuccess {
-            case result: String => asyncCollector.collect(Iterable((str, 
result)));
-        }
-    }
-}
-
-// create the original stream
-val stream: DataStream[String] = ...
-
-// apply the async I/O transformation
-val resultStream: DataStream[(String, String)] =
-    AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, 
TimeUnit.MILLISECONDS, 100)
-
-{% endhighlight %}
-</div>
-</div>
-
-**Important note**: The `AsyncCollector` is completed with the first call of 
`AsyncCollector.collect`.
-All subsequent `collect` calls will be ignored.
-
-The following two parameters control the asynchronous operations:
-
-  - **Timeout**: The timeout defines how long an asynchronous request may take 
before it is considered failed. This parameter
-    guards against dead/failed requests.
-
-  - **Capacity**: This parameter defines how many asynchronous requests may be 
in progress at the same time.
-    Even though the async I/O approach leads typically to much better 
throughput, the operator can still be the bottleneck in
-    the streaming application. Limiting the number of concurrent requests 
ensures that the operator will not
-    accumulate an ever-growing backlog of pending requests, but that it will 
trigger backpressure once the capacity
-    is exhausted.
-
-
-### Order of Results
-
-The concurrent requests issued by the `AsyncFunction` frequently complete in 
some undefined order, based on which request finished first.
-To control in which order the resulting records are emitted, Flink offers two 
modes:
-
-  - **Unordered**: Result records are emitted as soon as the asynchronous 
request finishes.
-    The order of the records in the stream is different after the async I/O 
operator than before.
-    This mode has the lowest latency and lowest overhead, when used with 
*processing time* as the basic time characteristic.
-    Use `AsyncDataStream.unorderedWait(...)` for this mode.
-
-  - **Ordered**: In that case, the stream order is preserved. Result records 
are emitted in the same order as the asynchronous
-    requests are triggered (the order of the operators input records). To 
achieve that, the operator buffers a result record
-    until all its preceeding records are emitted (or timed out).
-    This usually introduces some amount of extra latency and some overhead in 
checkpointing, because records or results are maintained
-    in the checkpointed state for a longer time, compared to the unordered 
mode.
-    Use `AsyncDataStream.orderedWait(...)` for this mode.
-
-
-### Event Time
-
-When the streaming application works with [event time](../event_time.html), 
watermarks will be handled correctly by the
-asynchronous I/O operator. That means concretely the following for the two 
order modes:
-
-  - **Unordered**: Watermarks do not overtake records and vice versa, meaning 
watermarks establish an *order boundary*.
-    Records are emitted unordered only between watermarks.
-    A record occurring after a certain watermark will be emitted only after 
that watermark was emitted.
-    The watermark in turn will be emitted only after all result records from 
inputs before that watermark were emitted.
-
-    That means that in the presence of watermarks, the *unordered* mode 
introduces some of the same latency and management
-    overhead as the *ordered* mode does. The amount of that overhead depends 
on the watermark frequency.
-
-  - **Ordered**: Order of watermarks an records is preserved, just like order 
between records is preserved. There is no
-    significant change in overhead, compared to working with *processing time*.
-
-Please recall that *Ingestion Time* is a special case of *event time* with 
automatically generated watermarks that
-are based on the sources processing time.
-
-
-### Fault Tolerance Guarantees
-
-The asynchronous I/O operator offers full exactly-once fault tolerance 
guarantees. It stores the records for in-flight
-asynchronous requests in checkpoints and restores/re-triggers the requests 
when recovering from a failure.
-
-
-### Implementation Tips
-
-For implementations with *Futures* that have an *Executor* (or 
*ExecutionContext* in Scala) for callbacks, we suggets to use a 
`DirectExecutor`, because the
-callback typically does minimal work, and a `DirectExecutor` avoids an 
additional thread-to-thread handover overhead. The callback typically only hands
-the result to the `AsyncCollector`, which adds it to the output buffer. From 
there, the heavy logic that includes record emission and interaction
-with the checkpoint bookkeepting happens in a dedicated thread-pool anyways.
-
-A `DirectExecutor` can be obtained via 
`org.apache.flink.runtime.concurrent.Executors.directExecutor()` or
-`com.google.common.util.concurrent.MoreExecutors.directExecutor()`.
-
-
-### Caveat
-
-**The AsyncFunction is not called Multi-Threaded**
-
-A common confusion that we want to explicitly point out here is that the 
`AsyncFunction` is not called in a multi-threaded fashion.
-There exists only one instance of the `AsyncFunction` and it is called 
sequentially for each record in the respective partition
-of the stream. Unless the `asyncInvoke(...)` method returns fast and relies on 
a callback (by the client), it will not result in
-proper asynchronous I/O.
-
-For example, the following patterns result in a blocking `asyncInvoke(...)` 
functions and thus void the asynchronous behavior:
-
-  - Using a database client whose lookup/query method call blocks until the 
result has been received back
-
-  - Blocking/waiting on the future-type objects returned by an aynchronous 
client inside the `asyncInvoke(...)` method
-

http://git-wip-us.apache.org/repos/asf/flink/blob/31b86f60/docs/dev/stream/operators.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/operators.md b/docs/dev/stream/operators.md
deleted file mode 100644
index 70bd9ae..0000000
--- a/docs/dev/stream/operators.md
+++ /dev/null
@@ -1,1169 +0,0 @@
----
-title: "Operators"
-nav-id: operators
-nav-show_overview: true
-nav-parent_id: streaming
-nav-pos: 9
----
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-Operators transform one or more DataStreams into a new DataStream. Programs 
can combine
-multiple transformations into sophisticated topologies.
-
-This section gives a description of all the available transformations, the 
effective physical
-partitioning after applying those as well as insights into Flink's operator 
chaining.
-
-* toc
-{:toc}
-
-# DataStream Transformations
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-
-<br />
-
-<table class="table table-bordered">
-  <thead>
-    <tr>
-      <th class="text-left" style="width: 25%">Transformation</th>
-      <th class="text-center">Description</th>
-    </tr>
-  </thead>
-  <tbody>
-    <tr>
-          <td><strong>Map</strong><br>DataStream &rarr; DataStream</td>
-          <td>
-            <p>Takes one element and produces one element. A map function that 
doubles the values of the input stream:</p>
-    {% highlight java %}
-DataStream<Integer> dataStream = //...
-dataStream.map(new MapFunction<Integer, Integer>() {
-    @Override
-    public Integer map(Integer value) throws Exception {
-        return 2 * value;
-    }
-});
-    {% endhighlight %}
-          </td>
-        </tr>
-
-        <tr>
-          <td><strong>FlatMap</strong><br>DataStream &rarr; DataStream</td>
-          <td>
-            <p>Takes one element and produces zero, one, or more elements. A 
flatmap function that splits sentences to words:</p>
-    {% highlight java %}
-dataStream.flatMap(new FlatMapFunction<String, String>() {
-    @Override
-    public void flatMap(String value, Collector<String> out)
-        throws Exception {
-        for(String word: value.split(" ")){
-            out.collect(word);
-        }
-    }
-});
-    {% endhighlight %}
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Filter</strong><br>DataStream &rarr; DataStream</td>
-          <td>
-            <p>Evaluates a boolean function for each element and retains those 
for which the function returns true.
-            A filter that filters out zero values:
-            </p>
-    {% highlight java %}
-dataStream.filter(new FilterFunction<Integer>() {
-    @Override
-    public boolean filter(Integer value) throws Exception {
-        return value != 0;
-    }
-});
-    {% endhighlight %}
-          </td>
-        </tr>
-        <tr>
-          <td><strong>KeyBy</strong><br>DataStream &rarr; KeyedStream</td>
-          <td>
-            <p>Logically partitions a stream into disjoint partitions, each 
partition containing elements of the same key.
-            Internally, this is implemented with hash partitioning. See <a 
href="{{ site.baseurl }}/dev/api_concepts.html#specifying-keys">keys</a> on how 
to specify keys.
-            This transformation returns a KeyedStream.</p>
-    {% highlight java %}
-dataStream.keyBy("someKey") // Key by field "someKey"
-dataStream.keyBy(0) // Key by the first element of a Tuple
-    {% endhighlight %}
-            <p>
-            <span class="label label-danger">Attention</span>
-            A type <strong>cannot be a key</strong> if:
-           <ol>
-           <li> it is a POJO type but does not override the 
<em>hashCode()</em> method and
-           relies on the <em>Object.hashCode()</em> implementation.</li>
-           <li> it is an array of any type.</li>
-           </ol>
-           </p>
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Reduce</strong><br>KeyedStream &rarr; DataStream</td>
-          <td>
-            <p>A "rolling" reduce on a keyed data stream. Combines the current 
element with the last reduced value and
-            emits the new value.
-                    <br/>
-               <br/>
-            A reduce function that creates a stream of partial sums:</p>
-            {% highlight java %}
-keyedStream.reduce(new ReduceFunction<Integer>() {
-    @Override
-    public Integer reduce(Integer value1, Integer value2)
-    throws Exception {
-        return value1 + value2;
-    }
-});
-            {% endhighlight %}
-            </p>
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Fold</strong><br>KeyedStream &rarr; DataStream</td>
-          <td>
-          <p>A "rolling" fold on a keyed data stream with an initial value.
-          Combines the current element with the last folded value and
-          emits the new value.
-          <br/>
-          <br/>
-          <p>A fold function that, when applied on the sequence (1,2,3,4,5),
-          emits the sequence "start-1", "start-1-2", "start-1-2-3", ...</p>
-          {% highlight java %}
-DataStream<String> result =
-  keyedStream.fold("start", new FoldFunction<Integer, String>() {
-    @Override
-    public String fold(String current, Integer value) {
-        return current + "-" + value;
-    }
-  });
-          {% endhighlight %}
-          </p>
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Aggregations</strong><br>KeyedStream &rarr; 
DataStream</td>
-          <td>
-            <p>Rolling aggregations on a keyed data stream. The difference 
between min
-           and minBy is that min returns the minimum value, whereas minBy 
returns
-           the element that has the minimum value in this field (same for max 
and maxBy).</p>
-    {% highlight java %}
-keyedStream.sum(0);
-keyedStream.sum("key");
-keyedStream.min(0);
-keyedStream.min("key");
-keyedStream.max(0);
-keyedStream.max("key");
-keyedStream.minBy(0);
-keyedStream.minBy("key");
-keyedStream.maxBy(0);
-keyedStream.maxBy("key");
-    {% endhighlight %}
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Window</strong><br>KeyedStream &rarr; WindowedStream</td>
-          <td>
-            <p>Windows can be defined on already partitioned KeyedStreams. 
Windows group the data in each
-            key according to some characteristic (e.g., the data that arrived 
within the last 5 seconds).
-            See <a href="windows.html">windows</a> for a complete description 
of windows.
-    {% highlight java %}
-dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))); // 
Last 5 seconds of data
-    {% endhighlight %}
-        </p>
-          </td>
-        </tr>
-        <tr>
-          <td><strong>WindowAll</strong><br>DataStream &rarr; 
AllWindowedStream</td>
-          <td>
-              <p>Windows can be defined on regular DataStreams. Windows group 
all the stream events
-              according to some characteristic (e.g., the data that arrived 
within the last 5 seconds).
-              See <a href="windows.html">windows</a> for a complete 
description of windows.</p>
-              <p><strong>WARNING:</strong> This is in many cases a 
<strong>non-parallel</strong> transformation. All records will be
-               gathered in one task for the windowAll operator.</p>
-  {% highlight java %}
-dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 
seconds of data
-  {% endhighlight %}
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Window Apply</strong><br>WindowedStream &rarr; 
DataStream<br>AllWindowedStream &rarr; DataStream</td>
-          <td>
-            <p>Applies a general function to the window as a whole. Below is a 
function that manually sums the elements of a window.</p>
-            <p><strong>Note:</strong> If you are using a windowAll 
transformation, you need to use an AllWindowFunction instead.</p>
-    {% highlight java %}
-windowedStream.apply (new WindowFunction<Tuple2<String,Integer>, Integer, 
Tuple, Window>() {
-    public void apply (Tuple tuple,
-            Window window,
-            Iterable<Tuple2<String, Integer>> values,
-            Collector<Integer> out) throws Exception {
-        int sum = 0;
-        for (value t: values) {
-            sum += t.f1;
-        }
-        out.collect (new Integer(sum));
-    }
-});
-
-// applying an AllWindowFunction on non-keyed window stream
-allWindowedStream.apply (new AllWindowFunction<Tuple2<String,Integer>, 
Integer, Window>() {
-    public void apply (Window window,
-            Iterable<Tuple2<String, Integer>> values,
-            Collector<Integer> out) throws Exception {
-        int sum = 0;
-        for (value t: values) {
-            sum += t.f1;
-        }
-        out.collect (new Integer(sum));
-    }
-});
-    {% endhighlight %}
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Window Reduce</strong><br>WindowedStream &rarr; 
DataStream</td>
-          <td>
-            <p>Applies a functional reduce function to the window and returns 
the reduced value.</p>
-    {% highlight java %}
-windowedStream.reduce (new ReduceFunction<Tuple2<String,Integer>>() {
-    public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, 
Tuple2<String, Integer> value2) throws Exception {
-        return new Tuple2<String,Integer>(value1.f0, value1.f1 + value2.f1);
-    }
-});
-    {% endhighlight %}
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Window Fold</strong><br>WindowedStream &rarr; 
DataStream</td>
-          <td>
-            <p>Applies a functional fold function to the window and returns 
the folded value.
-               The example function, when applied on the sequence (1,2,3,4,5),
-               folds the sequence into the string "start-1-2-3-4-5":</p>
-    {% highlight java %}
-windowedStream.fold("start", new FoldFunction<Integer, String>() {
-    public String fold(String current, Integer value) {
-        return current + "-" + value;
-    }
-});
-    {% endhighlight %}
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Aggregations on windows</strong><br>WindowedStream 
&rarr; DataStream</td>
-          <td>
-            <p>Aggregates the contents of a window. The difference between min
-           and minBy is that min returns the minimun value, whereas minBy 
returns
-           the element that has the minimum value in this field (same for max 
and maxBy).</p>
-    {% highlight java %}
-windowedStream.sum(0);
-windowedStream.sum("key");
-windowedStream.min(0);
-windowedStream.min("key");
-windowedStream.max(0);
-windowedStream.max("key");
-windowedStream.minBy(0);
-windowedStream.minBy("key");
-windowedStream.maxBy(0);
-windowedStream.maxBy("key");
-    {% endhighlight %}
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Union</strong><br>DataStream* &rarr; DataStream</td>
-          <td>
-            <p>Union of two or more data streams creating a new stream 
containing all the elements from all the streams. Note: If you union a data 
stream
-            with itself you will get each element twice in the resulting 
stream.</p>
-    {% highlight java %}
-dataStream.union(otherStream1, otherStream2, ...);
-    {% endhighlight %}
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Window Join</strong><br>DataStream,DataStream &rarr; 
DataStream</td>
-          <td>
-            <p>Join two data streams on a given key and a common window.</p>
-    {% highlight java %}
-dataStream.join(otherStream)
-    .where(<key selector>).equalTo(<key selector>)
-    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
-    .apply (new JoinFunction () {...});
-    {% endhighlight %}
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Window CoGroup</strong><br>DataStream,DataStream &rarr; 
DataStream</td>
-          <td>
-            <p>Cogroups two data streams on a given key and a common 
window.</p>
-    {% highlight java %}
-dataStream.coGroup(otherStream)
-    .where(0).equalTo(1)
-    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
-    .apply (new CoGroupFunction () {...});
-    {% endhighlight %}
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Connect</strong><br>DataStream,DataStream &rarr; 
ConnectedStreams</td>
-          <td>
-            <p>"Connects" two data streams retaining their types. Connect 
allowing for shared state between
-            the two streams.</p>
-    {% highlight java %}
-DataStream<Integer> someStream = //...
-DataStream<String> otherStream = //...
-
-ConnectedStreams<Integer, String> connectedStreams = 
someStream.connect(otherStream);
-    {% endhighlight %}
-          </td>
-        </tr>
-        <tr>
-          <td><strong>CoMap, CoFlatMap</strong><br>ConnectedStreams &rarr; 
DataStream</td>
-          <td>
-            <p>Similar to map and flatMap on a connected data stream</p>
-    {% highlight java %}
-connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() {
-    @Override
-    public Boolean map1(Integer value) {
-        return true;
-    }
-
-    @Override
-    public Boolean map2(String value) {
-        return false;
-    }
-});
-connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() {
-
-   @Override
-   public void flatMap1(Integer value, Collector<String> out) {
-       out.collect(value.toString());
-   }
-
-   @Override
-   public void flatMap2(String value, Collector<String> out) {
-       for (String word: value.split(" ")) {
-         out.collect(word);
-       }
-   }
-});
-    {% endhighlight %}
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Split</strong><br>DataStream &rarr; SplitStream</td>
-          <td>
-            <p>
-                Split the stream into two or more streams according to some 
criterion.
-                {% highlight java %}
-SplitStream<Integer> split = someDataStream.split(new 
OutputSelector<Integer>() {
-    @Override
-    public Iterable<String> select(Integer value) {
-        List<String> output = new ArrayList<String>();
-        if (value % 2 == 0) {
-            output.add("even");
-        }
-        else {
-            output.add("odd");
-        }
-        return output;
-    }
-});
-                {% endhighlight %}
-            </p>
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Select</strong><br>SplitStream &rarr; DataStream</td>
-          <td>
-            <p>
-                Select one or more streams from a split stream.
-                {% highlight java %}
-SplitStream<Integer> split;
-DataStream<Integer> even = split.select("even");
-DataStream<Integer> odd = split.select("odd");
-DataStream<Integer> all = split.select("even","odd");
-                {% endhighlight %}
-            </p>
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Iterate</strong><br>DataStream &rarr; IterativeStream 
&rarr; DataStream</td>
-          <td>
-            <p>
-                Creates a "feedback" loop in the flow, by redirecting the 
output of one operator
-                to some previous operator. This is especially useful for 
defining algorithms that
-                continuously update a model. The following code starts with a 
stream and applies
-               the iteration body continuously. Elements that are greater than 
0 are sent back
-               to the feedback channel, and the rest of the elements are 
forwarded downstream.
-               See <a href="#iterations">iterations</a> for a complete 
description.
-                {% highlight java %}
-IterativeStream<Long> iteration = initialStream.iterate();
-DataStream<Long> iterationBody = iteration.map (/*do something*/);
-DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>(){
-    @Override
-    public boolean filter(Integer value) throws Exception {
-        return value > 0;
-    }
-});
-iteration.closeWith(feedback);
-DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>(){
-    @Override
-    public boolean filter(Integer value) throws Exception {
-        return value <= 0;
-    }
-});
-                {% endhighlight %}
-            </p>
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Extract Timestamps</strong><br>DataStream &rarr; 
DataStream</td>
-          <td>
-            <p>
-                Extracts timestamps from records in order to work with windows
-                that use event time semantics. See <a href="{{ site.baseurl 
}}/dev/event_time.html">Event Time</a>.
-                {% highlight java %}
-stream.assignTimestamps (new TimeStampExtractor() {...});
-                {% endhighlight %}
-            </p>
-          </td>
-        </tr>
-  </tbody>
-</table>
-
-</div>
-
-<div data-lang="scala" markdown="1">
-
-<br />
-
-<table class="table table-bordered">
-  <thead>
-    <tr>
-      <th class="text-left" style="width: 25%">Transformation</th>
-      <th class="text-center">Description</th>
-    </tr>
-  </thead>
-  <tbody>
-    <tr>
-          <td><strong>Map</strong><br>DataStream &rarr; DataStream</td>
-          <td>
-            <p>Takes one element and produces one element. A map function that 
doubles the values of the input stream:</p>
-    {% highlight scala %}
-dataStream.map { x => x * 2 }
-    {% endhighlight %}
-          </td>
-        </tr>
-
-        <tr>
-          <td><strong>FlatMap</strong><br>DataStream &rarr; DataStream</td>
-          <td>
-            <p>Takes one element and produces zero, one, or more elements. A 
flatmap function that splits sentences to words:</p>
-    {% highlight scala %}
-dataStream.flatMap { str => str.split(" ") }
-    {% endhighlight %}
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Filter</strong><br>DataStream &rarr; DataStream</td>
-          <td>
-            <p>Evaluates a boolean function for each element and retains those 
for which the function returns true.
-            A filter that filters out zero values:
-            </p>
-    {% highlight scala %}
-dataStream.filter { _ != 0 }
-    {% endhighlight %}
-          </td>
-        </tr>
-        <tr>
-          <td><strong>KeyBy</strong><br>DataStream &rarr; KeyedStream</td>
-          <td>
-            <p>Logically partitions a stream into disjoint partitions, each 
partition containing elements of the same key.
-            Internally, this is implemented with hash partitioning. See <a 
href="{{ site.baseurl }}/dev/api_concepts.html#specifying-keys">keys</a> on how 
to specify keys.
-            This transformation returns a KeyedStream.</p>
-    {% highlight scala %}
-dataStream.keyBy("someKey") // Key by field "someKey"
-dataStream.keyBy(0) // Key by the first element of a Tuple
-    {% endhighlight %}
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Reduce</strong><br>KeyedStream &rarr; DataStream</td>
-          <td>
-            <p>A "rolling" reduce on a keyed data stream. Combines the current 
element with the last reduced value and
-            emits the new value.
-                    <br/>
-               <br/>
-            A reduce function that creates a stream of partial sums:</p>
-            {% highlight scala %}
-keyedStream.reduce { _ + _ }
-            {% endhighlight %}
-            </p>
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Fold</strong><br>KeyedStream &rarr; DataStream</td>
-          <td>
-          <p>A "rolling" fold on a keyed data stream with an initial value.
-          Combines the current element with the last folded value and
-          emits the new value.
-          <br/>
-          <br/>
-          <p>A fold function that, when applied on the sequence (1,2,3,4,5),
-          emits the sequence "start-1", "start-1-2", "start-1-2-3", ...</p>
-          {% highlight scala %}
-val result: DataStream[String] =
-    keyedStream.fold("start")((str, i) => { str + "-" + i })
-          {% endhighlight %}
-          </p>
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Aggregations</strong><br>KeyedStream &rarr; 
DataStream</td>
-          <td>
-            <p>Rolling aggregations on a keyed data stream. The difference 
between min
-           and minBy is that min returns the minimun value, whereas minBy 
returns
-           the element that has the minimum value in this field (same for max 
and maxBy).</p>
-    {% highlight scala %}
-keyedStream.sum(0)
-keyedStream.sum("key")
-keyedStream.min(0)
-keyedStream.min("key")
-keyedStream.max(0)
-keyedStream.max("key")
-keyedStream.minBy(0)
-keyedStream.minBy("key")
-keyedStream.maxBy(0)
-keyedStream.maxBy("key")
-    {% endhighlight %}
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Window</strong><br>KeyedStream &rarr; WindowedStream</td>
-          <td>
-            <p>Windows can be defined on already partitioned KeyedStreams. 
Windows group the data in each
-            key according to some characteristic (e.g., the data that arrived 
within the last 5 seconds).
-            See <a href="windows.html">windows</a> for a description of 
windows.
-    {% highlight scala %}
-dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))) // 
Last 5 seconds of data
-    {% endhighlight %}
-        </p>
-          </td>
-        </tr>
-        <tr>
-          <td><strong>WindowAll</strong><br>DataStream &rarr; 
AllWindowedStream</td>
-          <td>
-              <p>Windows can be defined on regular DataStreams. Windows group 
all the stream events
-              according to some characteristic (e.g., the data that arrived 
within the last 5 seconds).
-              See <a href="windows.html">windows</a> for a complete 
description of windows.</p>
-              <p><strong>WARNING:</strong> This is in many cases a 
<strong>non-parallel</strong> transformation. All records will be
-               gathered in one task for the windowAll operator.</p>
-  {% highlight scala %}
-dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))) // Last 5 
seconds of data
-  {% endhighlight %}
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Window Apply</strong><br>WindowedStream &rarr; 
DataStream<br>AllWindowedStream &rarr; DataStream</td>
-          <td>
-            <p>Applies a general function to the window as a whole. Below is a 
function that manually sums the elements of a window.</p>
-            <p><strong>Note:</strong> If you are using a windowAll 
transformation, you need to use an AllWindowFunction instead.</p>
-    {% highlight scala %}
-windowedStream.apply { WindowFunction }
-
-// applying an AllWindowFunction on non-keyed window stream
-allWindowedStream.apply { AllWindowFunction }
-
-    {% endhighlight %}
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Window Reduce</strong><br>WindowedStream &rarr; 
DataStream</td>
-          <td>
-            <p>Applies a functional reduce function to the window and returns 
the reduced value.</p>
-    {% highlight scala %}
-windowedStream.reduce { _ + _ }
-    {% endhighlight %}
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Window Fold</strong><br>WindowedStream &rarr; 
DataStream</td>
-          <td>
-            <p>Applies a functional fold function to the window and returns 
the folded value.
-               The example function, when applied on the sequence (1,2,3,4,5),
-               folds the sequence into the string "start-1-2-3-4-5":</p>
-          {% highlight scala %}
-val result: DataStream[String] =
-    windowedStream.fold("start", (str, i) => { str + "-" + i })
-          {% endhighlight %}
-          </td>
-       </tr>
-        <tr>
-          <td><strong>Aggregations on windows</strong><br>WindowedStream 
&rarr; DataStream</td>
-          <td>
-            <p>Aggregates the contents of a window. The difference between min
-           and minBy is that min returns the minimum value, whereas minBy 
returns
-           the element that has the minimum value in this field (same for max 
and maxBy).</p>
-    {% highlight scala %}
-windowedStream.sum(0)
-windowedStream.sum("key")
-windowedStream.min(0)
-windowedStream.min("key")
-windowedStream.max(0)
-windowedStream.max("key")
-windowedStream.minBy(0)
-windowedStream.minBy("key")
-windowedStream.maxBy(0)
-windowedStream.maxBy("key")
-    {% endhighlight %}
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Union</strong><br>DataStream* &rarr; DataStream</td>
-          <td>
-            <p>Union of two or more data streams creating a new stream 
containing all the elements from all the streams. Note: If you union a data 
stream
-            with itself you will get each element twice in the resulting 
stream.</p>
-    {% highlight scala %}
-dataStream.union(otherStream1, otherStream2, ...)
-    {% endhighlight %}
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Window Join</strong><br>DataStream,DataStream &rarr; 
DataStream</td>
-          <td>
-            <p>Join two data streams on a given key and a common window.</p>
-    {% highlight scala %}
-dataStream.join(otherStream)
-    .where(<key selector>).equalTo(<key selector>)
-    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
-    .apply { ... }
-    {% endhighlight %}
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Window CoGroup</strong><br>DataStream,DataStream &rarr; 
DataStream</td>
-          <td>
-            <p>Cogroups two data streams on a given key and a common 
window.</p>
-    {% highlight scala %}
-dataStream.coGroup(otherStream)
-    .where(0).equalTo(1)
-    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
-    .apply {}
-    {% endhighlight %}
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Connect</strong><br>DataStream,DataStream &rarr; 
ConnectedStreams</td>
-          <td>
-            <p>"Connects" two data streams retaining their types, allowing for 
shared state between
-            the two streams.</p>
-    {% highlight scala %}
-someStream : DataStream[Int] = ...
-otherStream : DataStream[String] = ...
-
-val connectedStreams = someStream.connect(otherStream)
-    {% endhighlight %}
-          </td>
-        </tr>
-        <tr>
-          <td><strong>CoMap, CoFlatMap</strong><br>ConnectedStreams &rarr; 
DataStream</td>
-          <td>
-            <p>Similar to map and flatMap on a connected data stream</p>
-    {% highlight scala %}
-connectedStreams.map(
-    (_ : Int) => true,
-    (_ : String) => false
-)
-connectedStreams.flatMap(
-    (_ : Int) => true,
-    (_ : String) => false
-)
-    {% endhighlight %}
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Split</strong><br>DataStream &rarr; SplitStream</td>
-          <td>
-            <p>
-                Split the stream into two or more streams according to some 
criterion.
-                {% highlight scala %}
-val split = someDataStream.split(
-  (num: Int) =>
-    (num % 2) match {
-      case 0 => List("even")
-      case 1 => List("odd")
-    }
-)
-                {% endhighlight %}
-            </p>
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Select</strong><br>SplitStream &rarr; DataStream</td>
-          <td>
-            <p>
-                Select one or more streams from a split stream.
-                {% highlight scala %}
-
-val even = split select "even"
-val odd = split select "odd"
-val all = split.select("even","odd")
-                {% endhighlight %}
-            </p>
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Iterate</strong><br>DataStream &rarr; IterativeStream  
&rarr; DataStream</td>
-          <td>
-            <p>
-                Creates a "feedback" loop in the flow, by redirecting the 
output of one operator
-                to some previous operator. This is especially useful for 
defining algorithms that
-                continuously update a model. The following code starts with a 
stream and applies
-               the iteration body continuously. Elements that are greater than 
0 are sent back
-               to the feedback channel, and the rest of the elements are 
forwarded downstream.
-               See <a href="#iterations">iterations</a> for a complete 
description.
-                {% highlight java %}
-initialStream.iterate {
-  iteration => {
-    val iterationBody = iteration.map {/*do something*/}
-    (iterationBody.filter(_ > 0), iterationBody.filter(_ <= 0))
-  }
-}
-                {% endhighlight %}
-            </p>
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Extract Timestamps</strong><br>DataStream &rarr; 
DataStream</td>
-          <td>
-            <p>
-                Extracts timestamps from records in order to work with windows
-                that use event time semantics.
-                See <a href="{{ site.baseurl 
}}/apis/streaming/event_time.html">Event Time</a>.
-                {% highlight scala %}
-stream.assignTimestamps { timestampExtractor }
-                {% endhighlight %}
-            </p>
-          </td>
-        </tr>
-  </tbody>
-</table>
-
-Extraction from tuples, case classes and collections via anonymous pattern 
matching, like the following:
-{% highlight scala %}
-val data: DataStream[(Int, String, Double)] = // [...]
-data.map {
-  case (id, name, temperature) => // [...]
-}
-{% endhighlight %}
-is not supported by the API out-of-the-box. To use this feature, you should 
use a <a href="scala_api_extensions.html">Scala API extension</a>.
-
-
-</div>
-</div>
-
-The following transformations are available on data streams of Tuples:
-
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-
-<br />
-
-<table class="table table-bordered">
-  <thead>
-    <tr>
-      <th class="text-left" style="width: 20%">Transformation</th>
-      <th class="text-center">Description</th>
-    </tr>
-  </thead>
-  <tbody>
-   <tr>
-      <td><strong>Project</strong><br>DataStream &rarr; DataStream</td>
-      <td>
-        <p>Selects a subset of fields from the tuples
-{% highlight java %}
-DataStream<Tuple3<Integer, Double, String>> in = // [...]
-DataStream<Tuple2<String, Integer>> out = in.project(2,0);
-{% endhighlight %}
-        </p>
-      </td>
-    </tr>
-  </tbody>
-</table>
-
-</div>
-</div>
-
-
-# Physical partitioning
-
-Flink also gives low-level control (if desired) on the exact stream 
partitioning after a transformation,
-via the following functions.
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-
-<br />
-
-<table class="table table-bordered">
-  <thead>
-    <tr>
-      <th class="text-left" style="width: 20%">Transformation</th>
-      <th class="text-center">Description</th>
-    </tr>
-  </thead>
-  <tbody>
-   <tr>
-      <td><strong>Custom partitioning</strong><br>DataStream &rarr; 
DataStream</td>
-      <td>
-        <p>
-            Uses a user-defined Partitioner to select the target task for each 
element.
-            {% highlight java %}
-dataStream.partitionCustom(partitioner, "someKey");
-dataStream.partitionCustom(partitioner, 0);
-            {% endhighlight %}
-        </p>
-      </td>
-    </tr>
-   <tr>
-     <td><strong>Random partitioning</strong><br>DataStream &rarr; 
DataStream</td>
-     <td>
-       <p>
-            Partitions elements randomly according to a uniform distribution.
-            {% highlight java %}
-dataStream.shuffle();
-            {% endhighlight %}
-       </p>
-     </td>
-   </tr>
-   <tr>
-      <td><strong>Rebalancing (Round-robin 
partitioning)</strong><br>DataStream &rarr; DataStream</td>
-      <td>
-        <p>
-            Partitions elements round-robin, creating equal load per 
partition. Useful for performance
-            optimization in the presence of data skew.
-            {% highlight java %}
-dataStream.rebalance();
-            {% endhighlight %}
-        </p>
-      </td>
-    </tr>
-    <tr>
-      <td><strong>Rescaling</strong><br>DataStream &rarr; DataStream</td>
-      <td>
-        <p>
-            Partitions elements, round-robin, to a subset of downstream 
operations. This is
-            useful if you want to have pipelines where you, for example, fan 
out from
-            each parallel instance of a source to a subset of several mappers 
to distribute load
-            but don't want the full rebalance that rebalance() would incur. 
This would require only
-            local data transfers instead of transferring data over network, 
depending on
-            other configuration values such as the number of slots of 
TaskManagers.
-        </p>
-        <p>
-            The subset of downstream operations to which the upstream 
operation sends
-            elements depends on the degree of parallelism of both the upstream 
and downstream operation.
-            For example, if the upstream operation has parallelism 2 and the 
downstream operation
-            has parallelism 6, then one upstream operation would distribute 
elements to three
-            downstream operations while the other upstream operation would 
distribute to the other
-            three downstream operations. If, on the other hand, the downstream 
operation has parallelism
-            2 while the upstream operation has parallelism 6 then three 
upstream operations would
-            distribute to one downstream operation while the other three 
upstream operations would
-            distribute to the other downstream operation.
-        </p>
-        <p>
-            In cases where the different parallelisms are not multiples of 
each other one or several
-            downstream operations will have a differing number of inputs from 
upstream operations.
-        </p>
-        <p>
-            Please see this figure for a visualization of the connection 
pattern in the above
-            example:
-        </p>
-
-        <div style="text-align: center">
-            <img src="{{ site.baseurl }}/fig/rescale.svg" alt="Checkpoint 
barriers in data streams" />
-            </div>
-
-
-        <p>
-                    {% highlight java %}
-dataStream.rescale();
-            {% endhighlight %}
-
-        </p>
-      </td>
-    </tr>
-   <tr>
-      <td><strong>Broadcasting</strong><br>DataStream &rarr; DataStream</td>
-      <td>
-        <p>
-            Broadcasts elements to every partition.
-            {% highlight java %}
-dataStream.broadcast();
-            {% endhighlight %}
-        </p>
-      </td>
-    </tr>
-  </tbody>
-</table>
-
-</div>
-
-<div data-lang="scala" markdown="1">
-
-<br />
-
-<table class="table table-bordered">
-  <thead>
-    <tr>
-      <th class="text-left" style="width: 20%">Transformation</th>
-      <th class="text-center">Description</th>
-    </tr>
-  </thead>
-  <tbody>
-   <tr>
-      <td><strong>Custom partitioning</strong><br>DataStream &rarr; 
DataStream</td>
-      <td>
-        <p>
-            Uses a user-defined Partitioner to select the target task for each 
element.
-            {% highlight scala %}
-dataStream.partitionCustom(partitioner, "someKey")
-dataStream.partitionCustom(partitioner, 0)
-            {% endhighlight %}
-        </p>
-      </td>
-    </tr>
-   <tr>
-     <td><strong>Random partitioning</strong><br>DataStream &rarr; 
DataStream</td>
-     <td>
-       <p>
-            Partitions elements randomly according to a uniform distribution.
-            {% highlight scala %}
-dataStream.shuffle()
-            {% endhighlight %}
-       </p>
-     </td>
-   </tr>
-   <tr>
-      <td><strong>Rebalancing (Round-robin 
partitioning)</strong><br>DataStream &rarr; DataStream</td>
-      <td>
-        <p>
-            Partitions elements round-robin, creating equal load per 
partition. Useful for performance
-            optimization in the presence of data skew.
-            {% highlight scala %}
-dataStream.rebalance()
-            {% endhighlight %}
-        </p>
-      </td>
-    </tr>
-    <tr>
-      <td><strong>Rescaling</strong><br>DataStream &rarr; DataStream</td>
-      <td>
-        <p>
-            Partitions elements, round-robin, to a subset of downstream 
operations. This is
-            useful if you want to have pipelines where you, for example, fan 
out from
-            each parallel instance of a source to a subset of several mappers 
to distribute load
-            but don't want the full rebalance that rebalance() would incur. 
This would require only
-            local data transfers instead of transferring data over network, 
depending on
-            other configuration values such as the number of slots of 
TaskManagers.
-        </p>
-        <p>
-            The subset of downstream operations to which the upstream 
operation sends
-            elements depends on the degree of parallelism of both the upstream 
and downstream operation.
-            For example, if the upstream operation has parallelism 2 and the 
downstream operation
-            has parallelism 4, then one upstream operation would distribute 
elements to two
-            downstream operations while the other upstream operation would 
distribute to the other
-            two downstream operations. If, on the other hand, the downstream 
operation has parallelism
-            2 while the upstream operation has parallelism 4 then two upstream 
operations would
-            distribute to one downstream operation while the other two 
upstream operations would
-            distribute to the other downstream operations.
-        </p>
-        <p>
-            In cases where the different parallelisms are not multiples of 
each other one or several
-            downstream operations will have a differing number of inputs from 
upstream operations.
-
-        </p>
-        </p>
-            Please see this figure for a visualization of the connection 
pattern in the above
-            example:
-        </p>
-
-        <div style="text-align: center">
-            <img src="{{ site.baseurl }}/fig/rescale.svg" alt="Checkpoint 
barriers in data streams" />
-            </div>
-
-
-        <p>
-                    {% highlight java %}
-dataStream.rescale()
-            {% endhighlight %}
-
-        </p>
-      </td>
-    </tr>
-   <tr>
-      <td><strong>Broadcasting</strong><br>DataStream &rarr; DataStream</td>
-      <td>
-        <p>
-            Broadcasts elements to every partition.
-            {% highlight scala %}
-dataStream.broadcast()
-            {% endhighlight %}
-        </p>
-      </td>
-    </tr>
-  </tbody>
-</table>
-
-</div>
-</div>
-
-# Task chaining and resource groups
-
-Chaining two subsequent transformations means co-locating them within the same 
thread for better
-performance. Flink by default chains operators if this is possible (e.g., two 
subsequent map
-transformations). The API gives fine-grained control over chaining if desired:
-
-Use `StreamExecutionEnvironment.disableOperatorChaining()` if you want to 
disable chaining in
-the whole job. For more fine grained control, the following functions are 
available. Note that
-these functions can only be used right after a DataStream transformation as 
they refer to the
-previous transformation. For example, you can use 
`someStream.map(...).startNewChain()`, but
-you cannot use `someStream.startNewChain()`.
-
-A resource group is a slot in Flink, see
-[slots]({{site.baseurl}}/setup/config.html#configuring-taskmanager-processing-slots).
 You can
-manually isolate operators in separate slots if desired.
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-
-<br />
-
-<table class="table table-bordered">
-  <thead>
-    <tr>
-      <th class="text-left" style="width: 20%">Transformation</th>
-      <th class="text-center">Description</th>
-    </tr>
-  </thead>
-  <tbody>
-   <tr>
-      <td>Start new chain</td>
-      <td>
-        <p>Begin a new chain, starting with this operator. The two
-       mappers will be chained, and filter will not be chained to
-       the first mapper.
-{% highlight java %}
-someStream.filter(...).map(...).startNewChain().map(...);
-{% endhighlight %}
-        </p>
-      </td>
-    </tr>
-   <tr>
-      <td>Disable chaining</td>
-      <td>
-        <p>Do not chain the map operator
-{% highlight java %}
-someStream.map(...).disableChaining();
-{% endhighlight %}
-        </p>
-      </td>
-    </tr>
-    <tr>
-      <td>Set slot sharing group</td>
-      <td>
-        <p>Set the slot sharing group of an operation. Flink will put 
operations with the same
-        slot sharing group into the same slot while keeping operations that 
don't have the
-        slot sharing group in other slots. This can be used to isolate slots. 
The slot sharing
-        group is inherited from input operations if all input operations are 
in the same slot
-        sharing group.
-        The name of the default slot sharing group is "default", operations 
can explicitly
-        be put into this group by calling slotSharingGroup("default").
-{% highlight java %}
-someStream.filter(...).slotSharingGroup("name");
-{% endhighlight %}
-        </p>
-      </td>
-    </tr>
-  </tbody>
-</table>
-
-</div>
-
-<div data-lang="scala" markdown="1">
-
-<br />
-
-<table class="table table-bordered">
-  <thead>
-    <tr>
-      <th class="text-left" style="width: 20%">Transformation</th>
-      <th class="text-center">Description</th>
-    </tr>
-  </thead>
-  <tbody>
-   <tr>
-      <td>Start new chain</td>
-      <td>
-        <p>Begin a new chain, starting with this operator. The two
-       mappers will be chained, and filter will not be chained to
-       the first mapper.
-{% highlight scala %}
-someStream.filter(...).map(...).startNewChain().map(...)
-{% endhighlight %}
-        </p>
-      </td>
-    </tr>
-   <tr>
-      <td>Disable chaining</td>
-      <td>
-        <p>Do not chain the map operator
-{% highlight scala %}
-someStream.map(...).disableChaining()
-{% endhighlight %}
-        </p>
-      </td>
-    </tr>
-  <tr>
-      <td>Set slot sharing group</td>
-      <td>
-        <p>Set the slot sharing group of an operation. Flink will put 
operations with the same
-        slot sharing group into the same slot while keeping operations that 
don't have the
-        slot sharing group in other slots. This can be used to isolate slots. 
The slot sharing
-        group is inherited from input operations if all input operations are 
in the same slot
-        sharing group.
-        The name of the default slot sharing group is "default", operations 
can explicitly
-        be put into this group by calling slotSharingGroup("default").
-{% highlight java %}
-someStream.filter(...).slotSharingGroup("name")
-{% endhighlight %}
-        </p>
-      </td>
-    </tr>
-  </tbody>
-</table>
-
-</div>
-</div>
-
-
-{% top %}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/31b86f60/docs/dev/stream/operators/asyncio.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/operators/asyncio.md 
b/docs/dev/stream/operators/asyncio.md
new file mode 100644
index 0000000..1ea0792
--- /dev/null
+++ b/docs/dev/stream/operators/asyncio.md
@@ -0,0 +1,253 @@
+---
+title: "Asynchronous I/O for External Data Access"
+nav-title: "Async I/O"
+nav-parent_id: streaming_operators
+nav-pos: 60
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* ToC
+{:toc}
+
+This page explains the use of Flink's API for asynchronous I/O with external 
data stores.
+For users not familiar with asynchronous or event-driven programming, an 
article about Futures and
+event-driven programming may be useful preparation.
+
+Note: Details about the design and implementation of the asynchronous I/O 
utility can be found in the proposal and design document
+[FLIP-12: Asynchronous I/O Design and 
Implementation](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65870673).
+
+
+## The need for Asynchronous I/O Operations
+
+When interacting with external systems (for example when enriching stream 
events with data stored in a database), one needs to take care
+that communication delay with the external system does not dominate the 
streaming application's total work.
+
+Naively accessing data in the external database, for example in a 
`MapFunction`, typically means **synchronous** interaction:
+A request is sent to the database and the `MapFunction` waits until the 
response has been received. In many cases, this waiting
+makes up the vast majority of the function's time.
+
+Asynchronous interaction with the database means that a single parallel 
function instance can handle many requests concurrently and
+receive the responses concurrently. That way, the waiting time can be 
overlayed with sending other requests and
+receiving responses. At the very least, the waiting time is amortized over 
multiple requests. This leads in most cased to much higher
+streaming throughput.
+
+<img src="{{ site.baseurl }}/fig/async_io.svg" class="center" width="50%" />
+
+*Note:* Improving throughput by just scaling the `MapFunction` to a very high 
parallelism is in some cases possible as well, but usually
+comes at a very high resource cost: Having many more parallel MapFunction 
instances means more tasks, threads, Flink-internal network
+connections, network connections to the database, buffers, and general 
internal bookkeeping overhead.
+
+
+## Prerequisites
+
+As illustrated in the section above, implementing proper asynchronous I/O to a 
database (or key/value store) requires a client
+to that database that supports asynchronous requests. Many popular databases 
offer such a client.
+
+In the absence of such a client, one can try and turn a synchronous client 
into a limited concurrent client by creating
+multiple clients and handling the synchronous calls with a thread pool. 
However, this approach is usually less
+efficient than a proper asynchronous client.
+
+
+## Async I/O API
+
+Flink's Async I/O API allows users to use asynchronous request clients with 
data streams. The API handles the integration with
+data streams, well as handling order, event time, fault tolerance, etc.
+
+Assuming one has an asynchronous client for the target database, three parts 
are needed to implement a stream transformation
+with asynchronous I/O against the database:
+
+  - An implementation of `AsyncFunction` that dispatches the requests
+  - A *callback* that takes the result of the operation and hands it to the 
`AsyncCollector`
+  - Applying the async I/O operation on a DataStream as a transformation
+
+The following code example illustrates the basic pattern:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+// This example implements the asynchronous request and callback with Futures 
that have the
+// interface of Java 8's futures (which is the same one followed by Flink's 
Future)
+
+/**
+ * An implementation of the 'AsyncFunction' that sends requests and sets the 
callback.
+ */
+class AsyncDatabaseRequest extends RichAsyncFunction<String, Tuple2<String, 
String>> {
+
+    /** The database specific client that can issue concurrent requests with 
callbacks */
+    private transient DatabaseClient client;
+
+    @Override
+    public void open(Configuration parameters) throws Exception {
+        client = new DatabaseClient(host, post, credentials);
+    }
+
+    @Override
+    public void close() throws Exception {
+        client.close();
+    }
+
+    @Override
+    public void asyncInvoke(final String str, final 
AsyncCollector<Tuple2<String, String>> asyncCollector) throws Exception {
+
+        // issue the asynchronous request, receive a future for result
+        Future<String> resultFuture = client.query(str);
+
+        // set the callback to be executed once the request by the client is 
complete
+        // the callback simply forwards the result to the collector
+        resultFuture.thenAccept( (String result) -> {
+
+            asyncCollector.collect(Collections.singleton(new Tuple2<>(str, 
result)));
+         
+        });
+    }
+}
+
+// create the original stream
+DataStream<String> stream = ...;
+
+// apply the async I/O transformation
+DataStream<Tuple2<String, String>> resultStream =
+    AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, 
TimeUnit.MILLISECONDS, 100);
+
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+/**
+ * An implementation of the 'AsyncFunction' that sends requests and sets the 
callback.
+ */
+class AsyncDatabaseRequest extends AsyncFunction[String, (String, String)] {
+
+    /** The database specific client that can issue concurrent requests with 
callbacks */
+    lazy val client: DatabaseClient = new DatabaseClient(host, post, 
credentials)
+
+    /** The context used for the future callbacks */
+    implicit lazy val executor: ExecutionContext = 
ExecutionContext.fromExecutor(Executors.directExecutor())
+
+
+    override def asyncInvoke(str: String, asyncCollector: 
AsyncCollector[(String, String)]): Unit = {
+
+        // issue the asynchronous request, receive a future for the result
+        val resultFuture: Future[String] = client.query(str)
+
+        // set the callback to be executed once the request by the client is 
complete
+        // the callback simply forwards the result to the collector
+        resultFuture.onSuccess {
+            case result: String => asyncCollector.collect(Iterable((str, 
result)));
+        }
+    }
+}
+
+// create the original stream
+val stream: DataStream[String] = ...
+
+// apply the async I/O transformation
+val resultStream: DataStream[(String, String)] =
+    AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, 
TimeUnit.MILLISECONDS, 100)
+
+{% endhighlight %}
+</div>
+</div>
+
+**Important note**: The `AsyncCollector` is completed with the first call of 
`AsyncCollector.collect`.
+All subsequent `collect` calls will be ignored.
+
+The following two parameters control the asynchronous operations:
+
+  - **Timeout**: The timeout defines how long an asynchronous request may take 
before it is considered failed. This parameter
+    guards against dead/failed requests.
+
+  - **Capacity**: This parameter defines how many asynchronous requests may be 
in progress at the same time.
+    Even though the async I/O approach leads typically to much better 
throughput, the operator can still be the bottleneck in
+    the streaming application. Limiting the number of concurrent requests 
ensures that the operator will not
+    accumulate an ever-growing backlog of pending requests, but that it will 
trigger backpressure once the capacity
+    is exhausted.
+
+
+### Order of Results
+
+The concurrent requests issued by the `AsyncFunction` frequently complete in 
some undefined order, based on which request finished first.
+To control in which order the resulting records are emitted, Flink offers two 
modes:
+
+  - **Unordered**: Result records are emitted as soon as the asynchronous 
request finishes.
+    The order of the records in the stream is different after the async I/O 
operator than before.
+    This mode has the lowest latency and lowest overhead, when used with 
*processing time* as the basic time characteristic.
+    Use `AsyncDataStream.unorderedWait(...)` for this mode.
+
+  - **Ordered**: In that case, the stream order is preserved. Result records 
are emitted in the same order as the asynchronous
+    requests are triggered (the order of the operators input records). To 
achieve that, the operator buffers a result record
+    until all its preceeding records are emitted (or timed out).
+    This usually introduces some amount of extra latency and some overhead in 
checkpointing, because records or results are maintained
+    in the checkpointed state for a longer time, compared to the unordered 
mode.
+    Use `AsyncDataStream.orderedWait(...)` for this mode.
+
+
+### Event Time
+
+When the streaming application works with [event time]({{ site.baseurl 
}}/dev/event_time.html), watermarks will be handled correctly by the
+asynchronous I/O operator. That means concretely the following for the two 
order modes:
+
+  - **Unordered**: Watermarks do not overtake records and vice versa, meaning 
watermarks establish an *order boundary*.
+    Records are emitted unordered only between watermarks.
+    A record occurring after a certain watermark will be emitted only after 
that watermark was emitted.
+    The watermark in turn will be emitted only after all result records from 
inputs before that watermark were emitted.
+
+    That means that in the presence of watermarks, the *unordered* mode 
introduces some of the same latency and management
+    overhead as the *ordered* mode does. The amount of that overhead depends 
on the watermark frequency.
+
+  - **Ordered**: Order of watermarks an records is preserved, just like order 
between records is preserved. There is no
+    significant change in overhead, compared to working with *processing time*.
+
+Please recall that *Ingestion Time* is a special case of *event time* with 
automatically generated watermarks that
+are based on the sources processing time.
+
+
+### Fault Tolerance Guarantees
+
+The asynchronous I/O operator offers full exactly-once fault tolerance 
guarantees. It stores the records for in-flight
+asynchronous requests in checkpoints and restores/re-triggers the requests 
when recovering from a failure.
+
+
+### Implementation Tips
+
+For implementations with *Futures* that have an *Executor* (or 
*ExecutionContext* in Scala) for callbacks, we suggets to use a 
`DirectExecutor`, because the
+callback typically does minimal work, and a `DirectExecutor` avoids an 
additional thread-to-thread handover overhead. The callback typically only hands
+the result to the `AsyncCollector`, which adds it to the output buffer. From 
there, the heavy logic that includes record emission and interaction
+with the checkpoint bookkeepting happens in a dedicated thread-pool anyways.
+
+A `DirectExecutor` can be obtained via 
`org.apache.flink.runtime.concurrent.Executors.directExecutor()` or
+`com.google.common.util.concurrent.MoreExecutors.directExecutor()`.
+
+
+### Caveat
+
+**The AsyncFunction is not called Multi-Threaded**
+
+A common confusion that we want to explicitly point out here is that the 
`AsyncFunction` is not called in a multi-threaded fashion.
+There exists only one instance of the `AsyncFunction` and it is called 
sequentially for each record in the respective partition
+of the stream. Unless the `asyncInvoke(...)` method returns fast and relies on 
a callback (by the client), it will not result in
+proper asynchronous I/O.
+
+For example, the following patterns result in a blocking `asyncInvoke(...)` 
functions and thus void the asynchronous behavior:
+
+  - Using a database client whose lookup/query method call blocks until the 
result has been received back
+
+  - Blocking/waiting on the future-type objects returned by an aynchronous 
client inside the `asyncInvoke(...)` method
+

Reply via email to