http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/dev/event_timestamps_watermarks.md
----------------------------------------------------------------------
diff --git a/docs/dev/event_timestamps_watermarks.md 
b/docs/dev/event_timestamps_watermarks.md
new file mode 100644
index 0000000..8d152df
--- /dev/null
+++ b/docs/dev/event_timestamps_watermarks.md
@@ -0,0 +1,329 @@
+---
+title: "Generating Timestamps / Watermarks"
+nav-parent_id: event_time
+nav-pos: 1
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* toc
+{:toc}
+
+
+This section is relevant for program running on **Event Time**. For an 
introduction to *Event Time*,
+*Processing Time*, and *Ingestion Time*, please refer to the [event time 
introduction]({{ site.baseurl }}/dev/event_time.html)
+
+To work with *Event Time*, streaming programs need to set the *time 
characteristic* accordingly.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+{% endhighlight %}
+</div>
+</div>
+
+## Assigning Timestamps
+
+In order to work with *Event Time*, Flink needs to know the events' 
*timestamps*, meaning each element in the
+stream needs to get its event timestamp *assigned*. That happens usually by 
accessing/extracting the
+timestamp from some field in the element.
+
+Timestamp assignment goes hand-in-hand with generating watermarks, which tell 
the system about
+the progress in event time.
+
+There are two ways to assign timestamps and generate Watermarks:
+
+  1. Directly in the data stream source
+  2. Via a timestamp assigner / watermark generator: in Flink timestamp 
assigners also define the watermarks to be emitted
+
+<span class="label label-danger">Attention</span> Both timestamps and 
watermarks are specified as
+millliseconds since the Java epoch of 1970-01-01T00:00:00Z.
+
+### Source Functions with Timestamps and Watermarks
+
+Stream sources can also directly assign timestamps to the elements they 
produce and emit Watermarks. In that case,
+no Timestamp Assigner is needed.
+
+To assign a timestamp to an element in the source directly, the source must 
use the `collectWithTimestamp(...)`
+method on the `SourceContext`. To generate Watermarks, the source must call 
the `emitWatermark(Watermark)` function.
+
+Below is a simple example of a source *(non-checkpointed)* that assigns 
timestamps and generates Watermarks
+depending on special events:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+@Override
+public void run(SourceContext<MyType> ctx) throws Exception {
+       while (/* condition */) {
+               MyType next = getNext();
+               ctx.collectWithTimestamp(next, next.getEventTimestamp());
+
+               if (next.hasWatermarkTime()) {
+                       ctx.emitWatermark(new 
Watermark(next.getWatermarkTime()));
+               }
+       }
+}
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+override def run(ctx: SourceContext[MyType]): Unit = {
+       while (/* condition */) {
+               val next: MyType = getNext()
+               ctx.collectWithTimestamp(next, next.eventTimestamp)
+
+               if (next.hasWatermarkTime) {
+                       ctx.emitWatermark(new Watermark(next.getWatermarkTime))
+               }
+       }
+}
+{% endhighlight %}
+</div>
+</div>
+
+*Note:* If the streaming program uses a TimestampAssigner on a stream where 
elements have a timestamp already,
+those timestamps will be overwritten by the TimestampAssigner. Similarly, 
Watermarks will be overwritten as well.
+
+
+### Timestamp Assigners / Watermark Generators
+
+Timestamp Assigners take a stream and produce a new stream with timestamped 
elements and watermarks. If the
+original stream had timestamps and/or watermarks already, the timestamp 
assigner overwrites them.
+
+The timestamp assigners usually are specified immediately after the data 
source but it is not strictly required to do so.
+A common pattern is, for example, to parse (*MapFunction*) and filter 
(*FilterFunction*) before the timestamp assigner.
+In any case, the timestamp assigner needs to be specified before the first 
operation on event time
+(such as the first window operation). As a special case, when using Kafka as 
the source of a streaming job,
+Flink allows the specification of a timestamp assigner / watermark emitter 
inside
+the source (or consumer) itself. More information on how to do so can be found 
in the
+[Kafka Connector documentation]({{ site.baseurl }}/dev/connectors/kafka.html).
+
+
+**NOTE:** The remainder of this section presents the main interfaces a 
programmer has
+to implement in order to create her own timestamp extractors/watermark 
emitters.
+To see the pre-implemented extractors that ship with Flink, please refer to the
+[Pre-defined Timestamp Extractors / Watermark Emitters]({{ site.baseurl 
}}/dev/event_timestamp_extractors.html) page.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+DataStream<MyEvent> stream = env.readFile(
+        myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
+        FilePathFilter.createDefaultFilter(), typeInfo);
+
+DataStream<MyEvent> withTimestampsAndWatermarks = stream
+        .filter( event -> event.severity() == WARNING )
+        .assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks());
+
+withTimestampsAndWatermarks
+        .keyBy( (event) -> event.getGroup() )
+        .timeWindow(Time.seconds(10))
+        .reduce( (a, b) -> a.add(b) )
+        .addSink(...);
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+
+val stream: DataStream[MyEvent] = env.readFile(
+         myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
+         FilePathFilter.createDefaultFilter());
+
+val withTimestampsAndWatermarks: DataStream[MyEvent] = stream
+        .filter( _.severity == WARNING )
+        .assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks())
+
+withTimestampsAndWatermarks
+        .keyBy( _.getGroup )
+        .timeWindow(Time.seconds(10))
+        .reduce( (a, b) => a.add(b) )
+        .addSink(...)
+{% endhighlight %}
+</div>
+</div>
+
+
+#### **With Periodic Watermarks**
+
+The `AssignerWithPeriodicWatermarks` assigns timestamps and generates 
watermarks periodically (possibly depending
+on the stream elements, or purely based on processing time).
+
+The interval (every *n* milliseconds) in which the watermark will be generated 
is defined via
+`ExecutionConfig.setAutoWatermarkInterval(...)`. Each time, the assigner's 
`getCurrentWatermark()` method will be
+called, and a new Watermark will be emitted, if the returned Watermark is 
non-null and larger than the previous
+Watermark.
+
+Two simple examples of timestamp assigners with periodic watermark generation 
are below.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+/**
+ * This generator generates watermarks assuming that elements come out of 
order to a certain degree only.
+ * The latest elements for a certain timestamp t will arrive at most n 
milliseconds after the earliest
+ * elements for timestamp t.
+ */
+public class BoundedOutOfOrdernessGenerator extends 
AssignerWithPeriodicWatermarks<MyEvent> {
+
+    private final long maxOutOfOrderness = 3500; // 3.5 seconds
+
+    private long currentMaxTimestamp;
+
+    @Override
+    public long extractTimestamp(MyEvent element, long 
previousElementTimestamp) {
+        long timestamp = element.getCreationTime();
+        currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
+        return timestamp;
+    }
+
+    @Override
+    public Watermark getCurrentWatermark() {
+        // return the watermark as current highest timestamp minus the 
out-of-orderness bound
+        return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
+    }
+}
+
+/**
+ * This generator generates watermarks that are lagging behind processing time 
by a certain amount.
+ * It assumes that elements arrive in Flink after at most a certain time.
+ */
+public class TimeLagWatermarkGenerator extends 
AssignerWithPeriodicWatermarks<MyEvent> {
+
+       private final long maxTimeLag = 5000; // 5 seconds
+
+       @Override
+       public long extractTimestamp(MyEvent element, long 
previousElementTimestamp) {
+               return element.getCreationTime();
+       }
+
+       @Override
+       public Watermark getCurrentWatermark() {
+               // return the watermark as current time minus the maximum time 
lag
+               return new Watermark(System.currentTimeMillis() - maxTimeLag);
+       }
+}
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+/**
+ * This generator generates watermarks assuming that elements come out of 
order to a certain degree only.
+ * The latest elements for a certain timestamp t will arrive at most n 
milliseconds after the earliest
+ * elements for timestamp t.
+ */
+class BoundedOutOfOrdernessGenerator extends 
AssignerWithPeriodicWatermarks[MyEvent] {
+
+    val maxOutOfOrderness = 3500L; // 3.5 seconds
+
+    var currentMaxTimestamp: Long;
+
+    override def extractTimestamp(element: MyEvent, previousElementTimestamp: 
Long): Long = {
+        val timestamp = element.getCreationTime()
+        currentMaxTimestamp = max(timestamp, currentMaxTimestamp)
+        timestamp;
+    }
+
+    override def getCurrentWatermark(): Watermark = {
+        // return the watermark as current highest timestamp minus the 
out-of-orderness bound
+        new Watermark(currentMaxTimestamp - maxOutOfOrderness);
+    }
+}
+
+/**
+ * This generator generates watermarks that are lagging behind processing time 
by a certain amount.
+ * It assumes that elements arrive in Flink after at most a certain time.
+ */
+class TimeLagWatermarkGenerator extends 
AssignerWithPeriodicWatermarks[MyEvent] {
+
+    val maxTimeLag = 5000L; // 5 seconds
+
+    override def extractTimestamp(element: MyEvent, previousElementTimestamp: 
Long): Long = {
+        element.getCreationTime
+    }
+
+    override def getCurrentWatermark(): Watermark = {
+        // return the watermark as current time minus the maximum time lag
+        new Watermark(System.currentTimeMillis() - maxTimeLag)
+    }
+}
+{% endhighlight %}
+</div>
+</div>
+
+#### **With Punctuated Watermarks**
+
+To generate Watermarks whenever a certain event indicates that a new watermark 
can be generated, use the
+`AssignerWithPunctuatedWatermarks`. For this class, Flink will first call the 
`extractTimestamp(...)` method
+to assign the element a timestamp, and then immediately call for that element 
the
+`checkAndGetNextWatermark(...)` method.
+
+The `checkAndGetNextWatermark(...)` method gets the timestamp that was 
assigned in the `extractTimestamp(...)`
+method, and can decide whether it wants to generate a Watermark. Whenever the 
`checkAndGetNextWatermark(...)`
+method returns a non-null Watermark, and that Watermark is larger than the 
latest previous Watermark, that
+new Watermark will be emitted.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+public class PunctuatedAssigner extends 
AssignerWithPunctuatedWatermarks<MyEvent> {
+
+       @Override
+       public long extractTimestamp(MyEvent element, long 
previousElementTimestamp) {
+               return element.getCreationTime();
+       }
+
+       @Override
+       public Watermark checkAndGetNextWatermark(MyEvent lastElement, long 
extractedTimestamp) {
+               return element.hasWatermarkMarker() ? new 
Watermark(extractedTimestamp) : null;
+       }
+}
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[MyEvent] {
+
+       override def extractTimestamp(element: MyEvent, 
previousElementTimestamp: Long): Long = {
+               element.getCreationTime
+       }
+
+       override def checkAndGetNextWatermark(lastElement: MyEvent, 
extractedTimestamp: Long): Watermark = {
+               if (element.hasWatermarkMarker()) new 
Watermark(extractedTimestamp) else null
+       }
+}
+{% endhighlight %}
+</div>
+</div>
+
+*Note:* It is possible to generate a watermark on every single event. However, 
because each watermark causes some
+computation downstream, an excessive number of watermarks slows down 
performance.

http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/dev/index.md
----------------------------------------------------------------------
diff --git a/docs/dev/index.md b/docs/dev/index.md
new file mode 100644
index 0000000..67916c1
--- /dev/null
+++ b/docs/dev/index.md
@@ -0,0 +1,25 @@
+---
+title: "Application Development"
+nav-id: dev
+nav-title: '<i class="fa fa-code" aria-hidden="true"></i> Application 
Development'
+nav-parent_id: root
+nav-pos: 3
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->

http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/dev/java8.md
----------------------------------------------------------------------
diff --git a/docs/dev/java8.md b/docs/dev/java8.md
new file mode 100644
index 0000000..3792e27
--- /dev/null
+++ b/docs/dev/java8.md
@@ -0,0 +1,196 @@
+---
+title: "Java 8"
+nav-parent_id: apis
+nav-pos: 105
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+Java 8 introduces several new language features designed for faster and 
clearer coding. With the most important feature,
+the so-called "Lambda Expressions", Java 8 opens the door to functional 
programming. Lambda Expressions allow for implementing and
+passing functions in a straightforward way without having to declare 
additional (anonymous) classes.
+
+The newest version of Flink supports the usage of Lambda Expressions for all 
operators of the Java API.
+This document shows how to use Lambda Expressions and describes current 
limitations. For a general introduction to the
+Flink API, please refer to the [Programming Guide]({{ site.baseurl 
}}/dev/api_concepts.html)
+
+* TOC
+{:toc}
+
+### Examples
+
+The following example illustrates how to implement a simple, inline `map()` 
function that squares its input using a Lambda Expression.
+The types of input `i` and output parameters of the `map()` function need not 
to be declared as they are inferred by the Java 8 compiler.
+
+~~~java
+env.fromElements(1, 2, 3)
+// returns the squared i
+.map(i -> i*i)
+.print();
+~~~
+
+The next two examples show different implementations of a function that uses a 
`Collector` for output.
+Functions, such as `flatMap()`, require a output type (in this case `String`) 
to be defined for the `Collector` in order to be type-safe.
+If the `Collector` type can not be inferred from the surrounding context, it 
need to be declared in the Lambda Expression's parameter list manually.
+Otherwise the output will be treated as type `Object` which can lead to 
undesired behaviour.
+
+~~~java
+DataSet<Integer> input = env.fromElements(1, 2, 3);
+
+// collector type must be declared
+input.flatMap((Integer number, Collector<String> out) -> {
+    StringBuilder builder = new StringBuilder();
+    for(int i = 0; i < number; i++) {
+        builder.append("a");
+        out.collect(builder.toString());
+    }
+})
+// returns (on separate lines) "a", "a", "aa", "a", "aa", "aaa"
+.print();
+~~~
+
+~~~java
+DataSet<Integer> input = env.fromElements(1, 2, 3);
+
+// collector type must not be declared, it is inferred from the type of the 
dataset
+DataSet<String> manyALetters = input.flatMap((number, out) -> {
+    StringBuilder builder = new StringBuilder();
+    for(int i = 0; i < number; i++) {
+       builder.append("a");
+       out.collect(builder.toString());
+    }
+});
+
+// returns (on separate lines) "a", "a", "aa", "a", "aa", "aaa"
+manyALetters.print();
+~~~
+
+The following code demonstrates a word count which makes extensive use of 
Lambda Expressions.
+
+~~~java
+DataSet<String> input = env.fromElements("Please count", "the words", "but not 
this");
+
+// filter out strings that contain "not"
+input.filter(line -> !line.contains("not"))
+// split each line by space
+.map(line -> line.split(" "))
+// emit a pair <word,1> for each array element
+.flatMap((String[] wordArray, Collector<Tuple2<String, Integer>> out)
+    -> Arrays.stream(wordArray).forEach(t -> out.collect(new Tuple2<>(t, 1)))
+    )
+// group and sum up
+.groupBy(0).sum(1)
+// print
+.print();
+~~~
+
+### Compiler Limitations
+Currently, Flink only supports jobs containing Lambda Expressions completely 
if they are **compiled with the Eclipse JDT compiler contained in Eclipse Luna 
4.4.2 (and above)**.
+
+Only the Eclipse JDT compiler preserves the generic type information necessary 
to use the entire Lambda Expressions feature type-safely.
+Other compilers such as the OpenJDK's and Oracle JDK's `javac` throw away all 
generic parameters related to Lambda Expressions. This means that types such as 
`Tuple2<String,Integer` or `Collector<String>` declared as a Lambda function 
input or output parameter will be pruned to `Tuple2` or `Collector` in the 
compiled `.class` files, which is too little information for the Flink Compiler.
+
+How to compile a Flink job that contains Lambda Expressions with the JDT 
compiler will be covered in the next section.
+
+However, it is possible to implement functions such as `map()` or `filter()` 
with Lambda Expressions in Java 8 compilers other than the Eclipse JDT compiler 
as long as the function has no `Collector`s or `Iterable`s *and* only if the 
function handles unparameterized types such as `Integer`, `Long`, `String`, 
`MyOwnClass` (types without Generics!).
+
+#### Compile Flink jobs with the Eclipse JDT compiler and Maven
+
+If you are using the Eclipse IDE, you can run and debug your Flink code within 
the IDE without any problems after some configuration steps. The Eclipse IDE by 
default compiles its Java sources with the Eclipse JDT compiler. The next 
section describes how to configure the Eclipse IDE.
+
+If you are using a different IDE such as IntelliJ IDEA or you want to package 
your Jar-File with Maven to run your job on a cluster, you need to modify your 
project's `pom.xml` file and build your program with Maven. The 
[quickstart]({{site.baseurl}}/quickstart/setup_quickstart.html) contains 
preconfigured Maven projects which can be used for new projects or as a 
reference. Uncomment the mentioned lines in your generated quickstart `pom.xml` 
file if you want to use Java 8 with Lambda Expressions.
+
+Alternatively, you can manually insert the following lines to your Maven 
`pom.xml` file. Maven will then use the Eclipse JDT compiler for compilation.
+
+~~~xml
+<!-- put these lines under "project/build/pluginManagement/plugins" of your 
pom.xml -->
+
+<plugin>
+    <!-- Use compiler plugin with tycho as the adapter to the JDT compiler. -->
+    <artifactId>maven-compiler-plugin</artifactId>
+    <configuration>
+        <source>1.8</source>
+        <target>1.8</target>
+        <compilerId>jdt</compilerId>
+    </configuration>
+    <dependencies>
+        <!-- This dependency provides the implementation of compiler "jdt": -->
+        <dependency>
+            <groupId>org.eclipse.tycho</groupId>
+            <artifactId>tycho-compiler-jdt</artifactId>
+            <version>0.21.0</version>
+        </dependency>
+    </dependencies>
+</plugin>
+~~~
+
+If you are using Eclipse for development, the m2e plugin might complain about 
the inserted lines above and marks your `pom.xml` as invalid. If so, insert the 
following lines to your `pom.xml`.
+
+~~~xml
+<!-- put these lines under 
"project/build/pluginManagement/plugins/plugin[groupId="org.eclipse.m2e", 
artifactId="lifecycle-mapping"]/configuration/lifecycleMappingMetadata/pluginExecutions"
 of your pom.xml -->
+
+<pluginExecution>
+    <pluginExecutionFilter>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <versionRange>[3.1,)</versionRange>
+        <goals>
+            <goal>testCompile</goal>
+            <goal>compile</goal>
+        </goals>
+    </pluginExecutionFilter>
+    <action>
+        <ignore></ignore>
+    </action>
+</pluginExecution>
+~~~
+
+#### Run and debug Flink jobs within the Eclipse IDE
+
+First of all, make sure you are running a current version of Eclipse IDE 
(4.4.2 or later). Also make sure that you have a Java 8 Runtime Environment 
(JRE) installed in Eclipse IDE (`Window` -> `Preferences` -> `Java` -> 
`Installed JREs`).
+
+Create/Import your Eclipse project.
+
+If you are using Maven, you also need to change the Java version in your 
`pom.xml` for the `maven-compiler-plugin`. Otherwise right click the `JRE 
System Library` section of your project and open the `Properties` window in 
order to switch to a Java 8 JRE (or above) that supports Lambda Expressions.
+
+The Eclipse JDT compiler needs a special compiler flag in order to store type 
information in `.class` files. Open the JDT configuration file at `{project 
directoy}/.settings/org.eclipse.jdt.core.prefs` with your favorite text editor 
and add the following line:
+
+~~~
+org.eclipse.jdt.core.compiler.codegen.lambda.genericSignature=generate
+~~~
+
+If not already done, also modify the Java versions of the following properties 
to `1.8` (or above):
+
+~~~
+org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.8
+org.eclipse.jdt.core.compiler.compliance=1.8
+org.eclipse.jdt.core.compiler.source=1.8
+~~~
+
+After you have saved the file, perform a complete project refresh in Eclipse 
IDE.
+
+If you are using Maven, right click your Eclipse project and select `Maven` -> 
`Update Project...`.
+
+You have configured everything correctly, if the following Flink program runs 
without exceptions:
+
+~~~java
+final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+env.fromElements(1, 2, 3).map((in) -> new Tuple1<String>(" " + in)).print();
+env.execute();
+~~~

http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/dev/libraries.md
----------------------------------------------------------------------
diff --git a/docs/dev/libraries.md b/docs/dev/libraries.md
new file mode 100644
index 0000000..dc22e97
--- /dev/null
+++ b/docs/dev/libraries.md
@@ -0,0 +1,24 @@
+---
+title: "Libraries"
+nav-id: libs
+nav-parent_id: dev
+nav-pos: 8
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->

http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/dev/libs/cep.md
----------------------------------------------------------------------
diff --git a/docs/dev/libs/cep.md b/docs/dev/libs/cep.md
new file mode 100644
index 0000000..77266bc
--- /dev/null
+++ b/docs/dev/libs/cep.md
@@ -0,0 +1,652 @@
+---
+title: "FlinkCEP - Complex event processing for Flink"
+nav-title: Event Processing (CEP)
+nav-parent_id: libs
+nav-pos: 1
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+FlinkCEP is the complex event processing library for Flink.
+It allows you to easily detect complex event patterns in a stream of endless 
data.
+Complex events can then be constructed from matching sequences.
+This gives you the opportunity to quickly get hold of what's really important 
in your data.
+
+<span class="label label-danger">Attention</span> The events in the 
`DataStream` to which
+you want to apply pattern matching have to implement proper `equals()` and 
`hashCode()` methods
+because these are used for comparing and matching events.
+
+* This will be replaced by the TOC
+{:toc}
+
+## Getting Started
+
+If you want to jump right in, you have to [set up a Flink program]({{ 
site.baseurl }}/dev/api_concepts.html#linking-with-flink).
+Next, you have to add the FlinkCEP dependency to the `pom.xml` of your project.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight xml %}
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-cep{{ site.scala_version_suffix }}</artifactId>
+  <version>{{site.version }}</version>
+</dependency>
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight xml %}
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-cep-scala{{ site.scala_version_suffix }}</artifactId>
+  <version>{{site.version }}</version>
+</dependency>
+{% endhighlight %}
+</div>
+</div>
+
+Note that FlinkCEP is currently not part of the binary distribution.
+See linking with it for cluster execution 
[here]({{site.baseurl}}/dev/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution).
+
+Now you can start writing your first CEP program using the pattern API.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+DataStream<Event> input = ...
+
+Pattern<Event, ?> pattern = Pattern.begin("start").where(evt -> evt.getId() == 
42)
+    .next("middle").subtype(SubEvent.class).where(subEvt -> subEvt.getVolume() 
>= 10.0)
+    .followedBy("end").where(evt -> evt.getName().equals("end"));
+
+PatternStream<Event> patternStream = CEP.pattern(input, pattern);
+
+DataStream<Alert> result = patternStream.select(pattern -> {
+    return createAlertFrom(pattern);
+});
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val input: DataStream[Event] = ...
+
+val pattern = Pattern.begin("start").where(_.getId == 42)
+  .next("middle").subtype(classOf[SubEvent]).where(_.getVolume >= 10.0)
+  .followedBy("end").where(_.getName == "end")
+
+val patternStream = CEP.pattern(input, pattern)
+
+val result: DataStream[Alert] = patternStream.select(createAlert(_))
+{% endhighlight %}
+</div>
+</div>
+
+Note that we use use Java 8 lambdas in our Java code examples to make them 
more succinct.
+
+## The Pattern API
+
+The pattern API allows you to quickly define complex event patterns.
+
+Each pattern consists of multiple stages or what we call states.
+In order to go from one state to the next, the user can specify conditions.
+These conditions can be the contiguity of events or a filter condition on an 
event.
+
+Each pattern has to start with an initial state:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+Pattern<Event, ?> start = Pattern.<Event>begin("start");
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val start : Pattern[Event, _] = Pattern.begin("start")
+{% endhighlight %}
+</div>
+</div>
+
+Each state must have an unique name to identify the matched events later on.
+Additionally, we can specify a filter condition for the event to be accepted 
as the start event via the `where` method.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+start.where(new FilterFunction<Event>() {
+    @Override
+    public boolean filter(Event value) {
+        return ... // some condition
+    }
+});
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+start.where(event => ... /* some condition */)
+{% endhighlight %}
+</div>
+</div>
+
+We can also restrict the type of the accepted event to some subtype of the 
initial event type (here `Event`) via the `subtype` method.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+start.subtype(SubEvent.class).where(new FilterFunction<SubEvent>() {
+    @Override
+    public boolean filter(SubEvent value) {
+        return ... // some condition
+    }
+});
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+start.subtype(classOf[SubEvent]).where(subEvent => ... /* some condition */)
+{% endhighlight %}
+</div>
+</div>
+
+As it can be seen here, the subtype condition can also be combined with an 
additional filter condition on the subtype.
+In fact you can always provide multiple conditions by calling `where` and 
`subtype` multiple times.
+These conditions will then be combined using the logical AND operator.
+
+In order to construct or conditions, one has to call the `or` method with a 
respective filter function.
+Any existing filter function is then ORed with the given one.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+pattern.where(new FilterFunction<Event>() {
+    @Override
+    public boolean filter(Event value) {
+        return ... // some condition
+    }
+}).or(new FilterFunction<Event>() {
+    @Override
+    public boolean filter(Event value) {
+        return ... // or condition
+    }
+});
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+pattern.where(event => ... /* some condition */).or(event => ... /* or 
condition */)
+{% endhighlight %}
+</div>
+</div>
+
+Next, we can append further states to detect complex patterns.
+We can control the contiguity of two succeeding events to be accepted by the 
pattern.
+
+Strict contiguity means that two matching events have to succeed directly.
+This means that no other events can occur in between.
+A strict contiguity pattern state can be created via the `next` method.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+Pattern<Event, ?> strictNext = start.next("middle");
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val strictNext: Pattern[Event, _] = start.next("middle")
+{% endhighlight %}
+</div>
+</div>
+
+Non-strict contiguity means that other events are allowed to occur in-between 
two matching events.
+A non-strict contiguity pattern state can be created via the `followedBy` 
method.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+Pattern<Event, ?> nonStrictNext = start.followedBy("middle");
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val nonStrictNext : Pattern[Event, _] = start.followedBy("middle")
+{% endhighlight %}
+</div>
+</div>
+It is also possible to define a temporal constraint for the pattern to be 
valid.
+For example, one can define that a pattern should occur within 10 seconds via 
the `within` method.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+next.within(Time.seconds(10));
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+next.within(Time.seconds(10))
+{% endhighlight %}
+</div>
+</div>
+
+<br />
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+<table class="table table-bordered">
+    <thead>
+        <tr>
+            <th class="text-left" style="width: 25%">Pattern Operation</th>
+            <th class="text-center">Description</th>
+        </tr>
+    </thead>
+    <tbody>
+        <tr>
+            <td><strong>Begin</strong></td>
+            <td>
+            <p>Defines a starting pattern state:</p>
+{% highlight java %}
+Pattern<Event, ?> start = Pattern.<Event>begin("start");
+{% endhighlight %}
+            </td>
+        </tr>
+        <tr>
+            <td><strong>Next</strong></td>
+            <td>
+                <p>Appends a new pattern state. A matching event has to 
directly succeed the previous matching event:</p>
+{% highlight java %}
+Pattern<Event, ?> next = start.next("next");
+{% endhighlight %}
+            </td>
+        </tr>
+        <tr>
+            <td><strong>FollowedBy</strong></td>
+            <td>
+                <p>Appends a new pattern state. Other events can occur between 
a matching event and the previous matching event:</p>
+{% highlight java %}
+Pattern<Event, ?> followedBy = start.followedBy("next");
+{% endhighlight %}
+            </td>
+        </tr>
+        <tr>
+            <td><strong>Where</strong></td>
+            <td>
+                <p>Defines a filter condition for the current pattern state. 
Only if an event passes the filter, it can match the state:</p>
+{% highlight java %}
+patternState.where(new FilterFunction<Event>() {
+    @Override
+    public boolean filter(Event value) throws Exception {
+        return ... // some condition
+    }
+});
+{% endhighlight %}
+            </td>
+        </tr>
+        <tr>
+            <td><strong>Or</strong></td>
+            <td>
+                <p>Adds a new filter condition which is ORed with an existing 
filter condition. Only if an event passes the filter condition, it can match 
the state:</p>
+{% highlight java %}
+patternState.where(new FilterFunction<Event>() {
+    @Override
+    public boolean filter(Event value) throws Exception {
+        return ... // some condition
+    }
+}).or(new FilterFunction<Event>() {
+    @Override
+    public boolean filter(Event value) throws Exception {
+        return ... // alternative condition
+    }
+});
+{% endhighlight %}
+                    </td>
+                </tr>
+       <tr>
+           <td><strong>Subtype</strong></td>
+           <td>
+               <p>Defines a subtype condition for the current pattern state. 
Only if an event is of this subtype, it can match the state:</p>
+{% highlight java %}
+patternState.subtype(SubEvent.class);
+{% endhighlight %}
+           </td>
+       </tr>
+       <tr>
+          <td><strong>Within</strong></td>
+          <td>
+              <p>Defines the maximum time interval for an event sequence to 
match the pattern. If a non-completed event sequence exceeds this time, it is 
discarded:</p>
+{% highlight java %}
+patternState.within(Time.seconds(10));
+{% endhighlight %}
+          </td>
+      </tr>
+  </tbody>
+</table>
+</div>
+
+<div data-lang="scala" markdown="1">
+<table class="table table-bordered">
+    <thead>
+        <tr>
+            <th class="text-left" style="width: 25%">Pattern Operation</th>
+            <th class="text-center">Description</th>
+        </tr>
+    </thead>
+    <tbody>
+        <tr>
+            <td><strong>Begin</strong></td>
+            <td>
+            <p>Defines a starting pattern state:</p>
+{% highlight scala %}
+val start = Pattern.begin[Event]("start")
+{% endhighlight %}
+            </td>
+        </tr>
+        <tr>
+            <td><strong>Next</strong></td>
+            <td>
+                <p>Appends a new pattern state. A matching event has to 
directly succeed the previous matching event:</p>
+{% highlight scala %}
+val next = start.next("middle")
+{% endhighlight %}
+            </td>
+        </tr>
+        <tr>
+            <td><strong>FollowedBy</strong></td>
+            <td>
+                <p>Appends a new pattern state. Other events can occur between 
a matching event and the previous matching event:</p>
+{% highlight scala %}
+val followedBy = start.followedBy("middle")
+{% endhighlight %}
+            </td>
+        </tr>
+        <tr>
+            <td><strong>Where</strong></td>
+            <td>
+                <p>Defines a filter condition for the current pattern state. 
Only if an event passes the filter, it can match the state:</p>
+{% highlight scala %}
+patternState.where(event => ... /* some condition */)
+{% endhighlight %}
+            </td>
+        </tr>
+        <tr>
+            <td><strong>Or</strong></td>
+            <td>
+                <p>Adds a new filter condition which is ORed with an existing 
filter condition. Only if an event passes the filter condition, it can match 
the state:</p>
+{% highlight scala %}
+patternState.where(event => ... /* some condition */)
+    .or(event => ... /* alternative condition */)
+{% endhighlight %}
+                    </td>
+                </tr>
+       <tr>
+           <td><strong>Subtype</strong></td>
+           <td>
+               <p>Defines a subtype condition for the current pattern state. 
Only if an event is of this subtype, it can match the state:</p>
+{% highlight scala %}
+patternState.subtype(classOf[SubEvent])
+{% endhighlight %}
+           </td>
+       </tr>
+       <tr>
+          <td><strong>Within</strong></td>
+          <td>
+              <p>Defines the maximum time interval for an event sequence to 
match the pattern. If a non-completed event sequence exceeds this time, it is 
discarded:</p>
+{% highlight scala %}
+patternState.within(Time.seconds(10))
+{% endhighlight %}
+          </td>
+      </tr>
+  </tbody>
+</table>
+</div>
+
+</div>
+
+### Detecting Patterns
+
+In order to run a stream of events against your pattern, you have to create a 
`PatternStream`.
+Given an input stream `input` and a pattern `pattern`, you create the 
`PatternStream` by calling
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+DataStream<Event> input = ...
+Pattern<Event, ?> pattern = ...
+
+PatternStream<Event> patternStream = CEP.pattern(input, pattern);
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val input : DataStream[Event] = ...
+val pattern : Pattern[Event, _] = ...
+
+val patternStream: PatternStream[Event] = CEP.pattern(input, pattern)
+{% endhighlight %}
+</div>
+</div>
+
+### Selecting from Patterns
+Once you have obtained a `PatternStream` you can select from detected event 
sequences via the `select` or `flatSelect` methods.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+The `select` method requires a `PatternSelectFunction` implementation.
+A `PatternSelectFunction` has a `select` method which is called for each 
matching event sequence.
+It receives a map of string/event pairs of the matched events.
+The string is defined by the name of the state to which the event has been 
matched.
+The `select` method can return exactly one result.
+
+{% highlight java %}
+class MyPatternSelectFunction<IN, OUT> implements PatternSelectFunction<IN, 
OUT> {
+    @Override
+    public OUT select(Map<String, IN> pattern) {
+        IN startEvent = pattern.get("start");
+        IN endEvent = pattern.get("end");
+        return new OUT(startEvent, endEvent);
+    }
+}
+{% endhighlight %}
+
+A `PatternFlatSelectFunction` is similar to the `PatternSelectFunction`, with 
the only distinction that it can return an arbitrary number of results.
+In order to do this, the `select` method has an additional `Collector` 
parameter which is used for the element output.
+
+{% highlight java %}
+class MyPatternFlatSelectFunction<IN, OUT> implements 
PatternFlatSelectFunction<IN, OUT> {
+    @Override
+    public void select(Map<String, IN> pattern, Collector<OUT> collector) {
+        IN startEvent = pattern.get("start");
+        IN endEvent = pattern.get("end");
+
+        for (int i = 0; i < startEvent.getValue(); i++ ) {
+            collector.collect(new OUT(startEvent, endEvent));
+        }
+    }
+}
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+The `select` method takes a selection function as argument, which is called 
for each matching event sequence.
+It receives a map of string/event pairs of the matched events.
+The string is defined by the name of the state to which the event has been 
matched.
+The selection function returns exactly one result per call.
+
+{% highlight scala %}
+def selectFn(pattern : mutable.Map[String, IN]): OUT = {
+    val startEvent = pattern.get("start").get
+    val endEvent = pattern.get("end").get
+    OUT(startEvent, endEvent)
+}
+{% endhighlight %}
+
+The `flatSelect` method is similar to the `select` method. Their only 
difference is that the function passed to the `flatSelect` method can return an 
arbitrary number of results per call.
+In order to do this, the function for `flatSelect` has an additional 
`Collector` parameter which is used for the element output.
+
+{% highlight scala %}
+def flatSelectFn(pattern : mutable.Map[String, IN], collector : 
Collector[OUT]) = {
+    val startEvent = pattern.get("start").get
+    val endEvent = pattern.get("end").get
+    for (i <- 0 to startEvent.getValue) {
+        collector.collect(OUT(startEvent, endEvent))
+    }
+}
+{% endhighlight %}
+</div>
+</div>
+
+### Handling Timed Out Partial Patterns
+
+Whenever a pattern has a window length associated via the `within` key word, 
it is possible that partial event patterns will be discarded because they 
exceed the window length.
+In order to react to these timeout events the `select` and `flatSelect` API 
calls allow to specify a timeout handler.
+This timeout handler is called for each partial event pattern which has timed 
out.
+The timeout handler receives all so far matched events of the partial pattern 
and the timestamp when the timeout was detected.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+In order to treat partial patterns, the `select` and `flatSelect` API calls 
offer an overloaded version which takes as the first parameter a 
`PatternTimeoutFunction`/`PatternFlatTimeoutFunction` and as second parameter 
the known `PatternSelectFunction`/`PatternFlatSelectFunction`.
+The return type of the timeout function can be different from the select 
function.
+The timeout event and the select event are wrapped in `Either.Left` and 
`Either.Right` respectively so that the resulting data stream is of type 
`org.apache.flink.types.Either`.
+
+{% highlight java %}
+PatternStream<Event> patternStream = CEP.pattern(input, pattern);
+
+DataStream<Either<TimeoutEvent, ComplexEvent>> result = patternStream.select(
+    new PatternTimeoutFunction<Event, TimeoutEvent>() {...},
+    new PatternSelectFunction<Event, ComplexEvent>() {...}
+);
+
+DataStream<Either<TimeoutEvent, ComplexEvent>> flatResult = 
patternStream.flatSelect(
+    new PatternFlatTimeoutFunction<Event, TimeoutEvent>() {...},
+    new PatternFlatSelectFunction<Event, ComplexEvent>() {...}
+);
+{% endhighlight %}
+
+</div>
+
+<div data-lang="scala" markdown="1">
+In order to treat partial patterns, the `select` API call offers an overloaded 
version which takes as the first parameter a timeout function and as second 
parameter a selection function.
+The timeout function is called with a map of string-event pairs of the partial 
match which has timed out and a long indicating when the timeout occurred.
+The string is defined by the name of the state to which the event has been 
matched.
+The timeout function returns exactly one result per call.
+The return type of the timeout function can be different from the select 
function.
+The timeout event and the select event are wrapped in `Left` and `Right` 
respectively so that the resulting data stream is of type `Either`.
+
+{% highlight scala %}
+val patternStream: PatternStream[Event] = CEP.pattern(input, pattern)
+
+DataStream[Either[TimeoutEvent, ComplexEvent]] result = patternStream.select{
+    (pattern: mutable.Map[String, Event], timestamp: Long) => TimeoutEvent()
+} {
+    pattern: mutable.Map[String, Event] => ComplexEvent()
+}
+{% endhighlight %}
+
+The `flatSelect` API call offers the same overloaded version which takes as 
the first parameter a timeout function and as second parameter a selection 
function.
+In contrast to the `select` functions, the `flatSelect` functions are called 
with an `Collector`.
+The collector can be used to emit an arbitrary number of events.
+
+{% highlight scala %}
+val patternStream: PatternStream[Event] = CEP.pattern(input, pattern)
+
+DataStream[Either[TimeoutEvent, ComplexEvent]] result = 
patternStream.flatSelect{
+    (pattern: mutable.Map[String, Event], timestamp: Long, out: 
Collector[TimeoutEvent]) =>
+        out.collect(TimeoutEvent())
+} {
+    (pattern: mutable.Map[String, Event], out: Collector[ComplexEvent]) =>
+        out.collect(ComplexEvent())
+}
+{% endhighlight %}
+
+</div>
+</div>
+
+## Examples
+
+The following example detects the pattern `start, middle(name = "error") -> 
end(name = "critical")` on a keyed data stream of `Events`.
+The events are keyed by their ids and a valid pattern has to occur within 10 
seconds.
+The whole processing is done with event time.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+StreamExecutionEnvironment env = ...
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+DataStream<Event> input = ...
+
+DataStream<Event> partitionedInput = input.keyBy(new KeySelector<Event, 
Integer>() {
+       @Override
+       public Integer getKey(Event value) throws Exception {
+               return value.getId();
+       }
+});
+
+Pattern<Event, ?> pattern = Pattern.<Event>begin("start")
+       .next("middle").where(new FilterFunction<Event>() {
+               @Override
+               public boolean filter(Event value) throws Exception {
+                       return value.getName().equals("error");
+               }
+       }).followedBy("end").where(new FilterFunction<Event>() {
+               @Override
+               public boolean filter(Event value) throws Exception {
+                       return value.getName().equals("critical");
+               }
+       }).within(Time.seconds(10));
+
+PatternStream<Event> patternStream = CEP.pattern(partitionedInput, pattern);
+
+DataStream<Alert> alerts = patternStream.select(new 
PatternSelectFunction<Event, Alert>() {
+       @Override
+       public Alert select(Map<String, Event> pattern) throws Exception {
+               return createAlert(pattern);
+       }
+});
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env : StreamExecutionEnvironment = ...
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+
+val input : DataStream[Event] = ...
+
+val partitionedInput = input.keyBy(event => event.getId)
+
+val pattern = Pattern.begin("start")
+  .next("middle").where(_.getName == "error")
+  .followedBy("end").where(_.getName == "critical")
+  .within(Time.seconds(10))
+
+val patternStream = CEP.pattern(partitionedInput, pattern)
+
+val alerts = patternStream.select(createAlert(_)))
+{% endhighlight %}
+</div>
+</div>

http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/dev/libs/gelly/graph_algorithms.md
----------------------------------------------------------------------
diff --git a/docs/dev/libs/gelly/graph_algorithms.md 
b/docs/dev/libs/gelly/graph_algorithms.md
new file mode 100644
index 0000000..09f6abc
--- /dev/null
+++ b/docs/dev/libs/gelly/graph_algorithms.md
@@ -0,0 +1,308 @@
+---
+title: Graph Algorithms
+nav-parent_id: graphs
+nav-pos: 4
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+The logic blocks with which the `Graph` API and top-level algorithms are 
assembled are accessible in Gelly as graph
+algorithms in the `org.apache.flink.graph.asm` package. These algorithms 
provide optimization and tuning through
+configuration parameters and may provide implicit runtime reuse when 
processing the same input with a similar
+configuration.
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 20%">Algorithm</th>
+      <th class="text-center">Description</th>
+    </tr>
+  </thead>
+
+  <tbody>
+    <tr>
+      <td>degree.annotate.directed.<br/><strong>VertexInDegree</strong></td>
+      <td>
+        <p>Annotate vertices of a <a href="#graph-representation">directed 
graph</a> with the in-degree.</p>
+{% highlight java %}
+DataSet<Vertex<K, LongValue>> inDegree = graph
+  .run(new VertexInDegree()
+    .setIncludeZeroDegreeVertices(true));
+{% endhighlight %}
+        <p>Optional configuration:</p>
+        <ul>
+          <li><p><strong>setIncludeZeroDegreeVertices</strong>: by default 
only the edge set is processed for the computation of degree; when this flag is 
set an additional join is performed against the vertex set in order to output 
vertices with an in-degree of zero</p></li>
+          <li><p><strong>setParallelism</strong>: override the operator 
parallelism</p></li>
+        </ul>
+      </td>
+    </tr>
+
+    <tr>
+      <td>degree.annotate.directed.<br/><strong>VertexOutDegree</strong></td>
+      <td>
+        <p>Annotate vertices of a <a href="#graph-representation">directed 
graph</a> with the out-degree.</p>
+{% highlight java %}
+DataSet<Vertex<K, LongValue>> outDegree = graph
+  .run(new VertexOutDegree()
+    .setIncludeZeroDegreeVertices(true));
+{% endhighlight %}
+        <p>Optional configuration:</p>
+        <ul>
+          <li><p><strong>setIncludeZeroDegreeVertices</strong>: by default 
only the edge set is processed for the computation of degree; when this flag is 
set an additional join is performed against the vertex set in order to output 
vertices with an out-degree of zero</p></li>
+          <li><p><strong>setParallelism</strong>: override the operator 
parallelism</p></li>
+        </ul>
+      </td>
+    </tr>
+
+    <tr>
+      <td>degree.annotate.directed.<br/><strong>VertexDegrees</strong></td>
+      <td>
+        <p>Annotate vertices of a <a href="#graph-representation">directed 
graph</a> with the degree, out-degree, and in-degree.</p>
+{% highlight java %}
+DataSet<Vertex<K, Tuple2<LongValue, LongValue>>> degrees = graph
+  .run(new VertexDegrees()
+    .setIncludeZeroDegreeVertices(true));
+{% endhighlight %}
+        <p>Optional configuration:</p>
+        <ul>
+          <li><p><strong>setIncludeZeroDegreeVertices</strong>: by default 
only the edge set is processed for the computation of degree; when this flag is 
set an additional join is performed against the vertex set in order to output 
vertices with out- and in-degree of zero</p></li>
+          <li><p><strong>setParallelism</strong>: override the operator 
parallelism</p></li>
+        </ul>
+      </td>
+    </tr>
+
+    <tr>
+      <td>degree.annotate.directed.<br/><strong>EdgeSourceDegrees</strong></td>
+      <td>
+        <p>Annotate edges of a <a href="#graph-representation">directed 
graph</a> with the degree, out-degree, and in-degree of the source ID.</p>
+{% highlight java %}
+DataSet<Edge<K, Tuple2<EV, Degrees>>> sourceDegrees = graph
+  .run(new EdgeSourceDegrees());
+{% endhighlight %}
+        <p>Optional configuration:</p>
+        <ul>
+          <li><p><strong>setParallelism</strong>: override the operator 
parallelism</p></li>
+        </ul>
+      </td>
+    </tr>
+
+    <tr>
+      <td>degree.annotate.directed.<br/><strong>EdgeTargetDegrees</strong></td>
+      <td>
+        <p>Annotate edges of a <a href="#graph-representation">directed 
graph</a> with the degree, out-degree, and in-degree of the target ID.</p>
+{% highlight java %}
+DataSet<Edge<K, Tuple2<EV, Degrees>>> targetDegrees = graph
+  .run(new EdgeTargetDegrees();
+{% endhighlight %}
+        <p>Optional configuration:</p>
+        <ul>
+          <li><p><strong>setParallelism</strong>: override the operator 
parallelism</p></li>
+        </ul>
+      </td>
+    </tr>
+
+    <tr>
+      <td>degree.annotate.directed.<br/><strong>EdgeDegreesPair</strong></td>
+      <td>
+        <p>Annotate edges of a <a href="#graph-representation">directed 
graph</a> with the degree, out-degree, and in-degree of both the source and 
target vertices.</p>
+{% highlight java %}
+DataSet<Edge<K, Tuple2<EV, Degrees>>> degrees = graph
+  .run(new EdgeDegreesPair());
+{% endhighlight %}
+        <p>Optional configuration:</p>
+        <ul>
+          <li><p><strong>setParallelism</strong>: override the operator 
parallelism</p></li>
+        </ul>
+      </td>
+    </tr>
+
+    <tr>
+      <td>degree.annotate.undirected.<br/><strong>VertexDegree</strong></td>
+      <td>
+        <p>Annotate vertices of an <a href="#graph-representation">undirected 
graph</a> with the degree.</p>
+{% highlight java %}
+DataSet<Vertex<K, LongValue>> degree = graph
+  .run(new VertexDegree()
+    .setIncludeZeroDegreeVertices(true)
+    .setReduceOnTargetId(true));
+{% endhighlight %}
+        <p>Optional configuration:</p>
+        <ul>
+          <li><p><strong>setIncludeZeroDegreeVertices</strong>: by default 
only the edge set is processed for the computation of degree; when this flag is 
set an additional join is performed against the vertex set in order to output 
vertices with a degree of zero</p></li>
+          <li><p><strong>setParallelism</strong>: override the operator 
parallelism</p></li>
+          <li><p><strong>setReduceOnTargetId</strong>: the degree can be 
counted from either the edge source or target IDs. By default the source IDs 
are counted. Reducing on target IDs may optimize the algorithm if the input 
edge list is sorted by target ID.</p></li>
+        </ul>
+      </td>
+    </tr>
+
+    <tr>
+      
<td>degree.annotate.undirected.<br/><strong>EdgeSourceDegree</strong></td>
+      <td>
+        <p>Annotate edges of an <a href="#graph-representation">undirected 
graph</a> with degree of the source ID.</p>
+{% highlight java %}
+DataSet<Edge<K, Tuple2<EV, LongValue>>> sourceDegree = graph
+  .run(new EdgeSourceDegree()
+    .setReduceOnTargetId(true));
+{% endhighlight %}
+        <p>Optional configuration:</p>
+        <ul>
+          <li><p><strong>setParallelism</strong>: override the operator 
parallelism</p></li>
+          <li><p><strong>setReduceOnTargetId</strong>: the degree can be 
counted from either the edge source or target IDs. By default the source IDs 
are counted. Reducing on target IDs may optimize the algorithm if the input 
edge list is sorted by target ID.</p></li>
+        </ul>
+      </td>
+    </tr>
+
+    <tr>
+      
<td>degree.annotate.undirected.<br/><strong>EdgeTargetDegree</strong></td>
+      <td>
+        <p>Annotate edges of an <a href="#graph-representation">undirected 
graph</a> with degree of the target ID.</p>
+{% highlight java %}
+DataSet<Edge<K, Tuple2<EV, LongValue>>> targetDegree = graph
+  .run(new EdgeTargetDegree()
+    .setReduceOnSourceId(true));
+{% endhighlight %}
+        <p>Optional configuration:</p>
+        <ul>
+          <li><p><strong>setParallelism</strong>: override the operator 
parallelism</p></li>
+          <li><p><strong>setReduceOnSourceId</strong>: the degree can be 
counted from either the edge source or target IDs. By default the target IDs 
are counted. Reducing on source IDs may optimize the algorithm if the input 
edge list is sorted by source ID.</p></li>
+        </ul>
+      </td>
+    </tr>
+
+    <tr>
+      <td>degree.annotate.undirected.<br/><strong>EdgeDegreePair</strong></td>
+      <td>
+        <p>Annotate edges of an <a href="#graph-representation">undirected 
graph</a> with the degree of both the source and target vertices.</p>
+{% highlight java %}
+DataSet<Edge<K, Tuple3<EV, LongValue, LongValue>>> pairDegree = graph
+  .run(new EdgeDegreePair()
+    .setReduceOnTargetId(true));
+{% endhighlight %}
+        <p>Optional configuration:</p>
+        <ul>
+          <li><p><strong>setParallelism</strong>: override the operator 
parallelism</p></li>
+          <li><p><strong>setReduceOnTargetId</strong>: the degree can be 
counted from either the edge source or target IDs. By default the source IDs 
are counted. Reducing on target IDs may optimize the algorithm if the input 
edge list is sorted by target ID.</p></li>
+        </ul>
+      </td>
+    </tr>
+
+    <tr>
+      <td>degree.filter.undirected.<br/><strong>MaximumDegree</strong></td>
+      <td>
+        <p>Filter an <a href="#graph-representation">undirected graph</a> by 
maximum degree.</p>
+{% highlight java %}
+Graph<K, VV, EV> filteredGraph = graph
+  .run(new MaximumDegree(5000)
+    .setBroadcastHighDegreeVertices(true)
+    .setReduceOnTargetId(true));
+{% endhighlight %}
+        <p>Optional configuration:</p>
+        <ul>
+          <li><p><strong>setBroadcastHighDegreeVertices</strong>: join 
high-degree vertices using a broadcast-hash to reduce data shuffling when 
removing a relatively small number of high-degree vertices.</p></li>
+          <li><p><strong>setParallelism</strong>: override the operator 
parallelism</p></li>
+          <li><p><strong>setReduceOnTargetId</strong>: the degree can be 
counted from either the edge source or target IDs. By default the source IDs 
are counted. Reducing on target IDs may optimize the algorithm if the input 
edge list is sorted by target ID.</p></li>
+        </ul>
+      </td>
+    </tr>
+
+    <tr>
+      <td>simple.directed.<br/><strong>Simplify</strong></td>
+      <td>
+        <p>Remove self-loops and duplicate edges from a <a 
href="#graph-representation">directed graph</a>.</p>
+{% highlight java %}
+graph.run(new Simplify());
+{% endhighlight %}
+        <p>Optional configuration:</p>
+        <ul>
+          <li><p><strong>setParallelism</strong>: override the operator 
parallelism</p></li>
+        </ul>
+      </td>
+    </tr>
+
+    <tr>
+      <td>simple.undirected.<br/><strong>Simplify</strong></td>
+      <td>
+        <p>Add symmetric edges and remove self-loops and duplicate edges from 
an <a href="#graph-representation">undirected graph</a>.</p>
+{% highlight java %}
+graph.run(new Simplify());
+{% endhighlight %}
+        <p>Optional configuration:</p>
+        <ul>
+          <li><p><strong>setParallelism</strong>: override the operator 
parallelism</p></li>
+        </ul>
+      </td>
+    </tr>
+
+    <tr>
+      <td>translate.<br/><strong>TranslateGraphIds</strong></td>
+      <td>
+        <p>Translate vertex and edge IDs using the given 
<code>TranslateFunction</code>.</p>
+{% highlight java %}
+graph.run(new TranslateGraphIds(new LongValueToStringValue()));
+{% endhighlight %}
+        <p>Required configuration:</p>
+        <ul>
+          <li><p><strong>translator</strong>: implements type or value 
conversion</p></li>
+        </ul>
+        <p>Optional configuration:</p>
+        <ul>
+          <li><p><strong>setParallelism</strong>: override the operator 
parallelism</p></li>
+        </ul>
+      </td>
+    </tr>
+
+    <tr>
+      <td>translate.<br/><strong>TranslateVertexValues</strong></td>
+      <td>
+        <p>Translate vertex values using the given 
<code>TranslateFunction</code>.</p>
+{% highlight java %}
+graph.run(new TranslateVertexValues(new LongValueAddOffset(vertexCount)));
+{% endhighlight %}
+        <p>Required configuration:</p>
+        <ul>
+          <li><p><strong>translator</strong>: implements type or value 
conversion</p></li>
+        </ul>
+        <p>Optional configuration:</p>
+        <ul>
+          <li><p><strong>setParallelism</strong>: override the operator 
parallelism</p></li>
+        </ul>
+      </td>
+    </tr>
+
+    <tr>
+      <td>translate.<br/><strong>TranslateEdgeValues</strong></td>
+      <td>
+        <p>Translate edge values using the given 
<code>TranslateFunction</code>.</p>
+{% highlight java %}
+graph.run(new TranslateEdgeValues(new Nullify()));
+{% endhighlight %}
+        <p>Required configuration:</p>
+        <ul>
+          <li><p><strong>translator</strong>: implements type or value 
conversion</p></li>
+        </ul>
+        <p>Optional configuration:</p>
+        <ul>
+          <li><p><strong>setParallelism</strong>: override the operator 
parallelism</p></li>
+        </ul>
+      </td>
+    </tr>
+  </tbody>
+</table>
+
+{% top %}

Reply via email to