http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/dev/libs/storm_compatibility.md
----------------------------------------------------------------------
diff --git a/docs/dev/libs/storm_compatibility.md 
b/docs/dev/libs/storm_compatibility.md
new file mode 100644
index 0000000..89d7706
--- /dev/null
+++ b/docs/dev/libs/storm_compatibility.md
@@ -0,0 +1,287 @@
+---
+title: "Storm Compatibility"
+is_beta: true
+nav-parent_id: libs
+nav-pos: 2
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+[Flink streaming]({{ site.baseurl }}/dev/datastream_api.html) is compatible 
with Apache Storm interfaces and therefore allows
+reusing code that was implemented for Storm.
+
+You can:
+
+- execute a whole Storm `Topology` in Flink.
+- use Storm `Spout`/`Bolt` as source/operator in Flink streaming programs.
+
+This document shows how to use existing Storm code with Flink.
+
+* This will be replaced by the TOC
+{:toc}
+
+# Project Configuration
+
+Support for Storm is contained in the `flink-storm` Maven module.
+The code resides in the `org.apache.flink.storm` package.
+
+Add the following dependency to your `pom.xml` if you want to execute Storm 
code in Flink.
+
+~~~xml
+<dependency>
+       <groupId>org.apache.flink</groupId>
+       <artifactId>flink-storm{{ site.scala_version_suffix }}</artifactId>
+       <version>{{site.version}}</version>
+</dependency>
+~~~
+
+**Please note**: Do not add `storm-core` as a dependency. It is already 
included via `flink-storm`.
+
+**Please note**: `flink-storm` is not part of the provided binary Flink 
distribution.
+Thus, you need to include `flink-storm` classes (and their dependencies) in 
your program jar (also called ueber-jar or fat-jar) that is submitted to 
Flink's JobManager.
+See *WordCount Storm* within `flink-storm-examples/pom.xml` for an example how 
to package a jar correctly.
+
+If you want to avoid large ueber-jars, you can manually copy 
`storm-core-0.9.4.jar`, `json-simple-1.1.jar` and 
`flink-storm-{{site.version}}.jar` into Flink's `lib/` folder of each cluster 
node (*before* the cluster is started).
+For this case, it is sufficient to include only your own Spout and Bolt 
classes (and their internal dependencies) into the program jar.
+
+# Execute Storm Topologies
+
+Flink provides a Storm compatible API (`org.apache.flink.storm.api`) that 
offers replacements for the following classes:
+
+- `StormSubmitter` replaced by `FlinkSubmitter`
+- `NimbusClient` and `Client` replaced by `FlinkClient`
+- `LocalCluster` replaced by `FlinkLocalCluster`
+
+In order to submit a Storm topology to Flink, it is sufficient to replace the 
used Storm classes with their Flink replacements in the Storm *client code that 
assembles* the topology.
+The actual runtime code, ie, Spouts and Bolts, can be used *unmodified*.
+If a topology is executed in a remote cluster, parameters `nimbus.host` and 
`nimbus.thrift.port` are used as `jobmanger.rpc.address` and 
`jobmanger.rpc.port`, respectively.  If a parameter is not specified, the value 
is taken from `flink-conf.yaml`.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+~~~java
+TopologyBuilder builder = new TopologyBuilder(); // the Storm topology builder
+
+// actual topology assembling code and used Spouts/Bolts can be used as-is
+builder.setSpout("source", new FileSpout(inputFilePath));
+builder.setBolt("tokenizer", new BoltTokenizer()).shuffleGrouping("source");
+builder.setBolt("counter", new BoltCounter()).fieldsGrouping("tokenizer", new 
Fields("word"));
+builder.setBolt("sink", new 
BoltFileSink(outputFilePath)).shuffleGrouping("counter");
+
+Config conf = new Config();
+if(runLocal) { // submit to test cluster
+       // replaces: LocalCluster cluster = new LocalCluster();
+       FlinkLocalCluster cluster = new FlinkLocalCluster();
+       cluster.submitTopology("WordCount", conf, 
FlinkTopology.createTopology(builder));
+} else { // submit to remote cluster
+       // optional
+       // conf.put(Config.NIMBUS_HOST, "remoteHost");
+       // conf.put(Config.NIMBUS_THRIFT_PORT, 6123);
+       // replaces: StormSubmitter.submitTopology(topologyId, conf, 
builder.createTopology());
+       FlinkSubmitter.submitTopology("WordCount", conf, 
FlinkTopology.createTopology(builder));
+}
+~~~
+</div>
+</div>
+
+# Embed Storm Operators in Flink Streaming Programs
+
+As an alternative, Spouts and Bolts can be embedded into regular streaming 
programs.
+The Storm compatibility layer offers a wrapper classes for each, namely 
`SpoutWrapper` and `BoltWrapper` (`org.apache.flink.storm.wrappers`).
+
+Per default, both wrappers convert Storm output tuples to Flink's 
[Tuple]({{site.baseurl}}/dev/api_concepts.html#tuples-and-case-classes) types 
(ie, `Tuple0` to `Tuple25` according to the number of fields of the Storm 
tuples).
+For single field output tuples a conversion to the field's data type is also 
possible (eg, `String` instead of `Tuple1<String>`).
+
+Because Flink cannot infer the output field types of Storm operators, it is 
required to specify the output type manually.
+In order to get the correct `TypeInformation` object, Flink's `TypeExtractor` 
can be used.
+
+## Embed Spouts
+
+In order to use a Spout as Flink source, use 
`StreamExecutionEnvironment.addSource(SourceFunction, TypeInformation)`.
+The Spout object is handed to the constructor of `SpoutWrapper<OUT>` that 
serves as first argument to `addSource(...)`.
+The generic type declaration `OUT` specifies the type of the source output 
stream.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+~~~java
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+// stream has `raw` type (single field output streams only)
+DataStream<String> rawInput = env.addSource(
+       new SpoutWrapper<String>(new FileSpout(localFilePath), new String[] { 
Utils.DEFAULT_STREAM_ID }), // emit default output stream as raw type
+       TypeExtractor.getForClass(String.class)); // output type
+
+// process data stream
+[...]
+~~~
+</div>
+</div>
+
+If a Spout emits a finite number of tuples, `SpoutWrapper` can be configures 
to terminate automatically by setting `numberOfInvocations` parameter in its 
constructor.
+This allows the Flink program to shut down automatically after all data is 
processed.
+Per default the program will run until it is 
[canceled]({{site.baseurl}}/setup/cli.html) manually.
+
+
+## Embed Bolts
+
+In order to use a Bolt as Flink operator, use `DataStream.transform(String, 
TypeInformation, OneInputStreamOperator)`.
+The Bolt object is handed to the constructor of `BoltWrapper<IN,OUT>` that 
serves as last argument to `transform(...)`.
+The generic type declarations `IN` and `OUT` specify the type of the 
operator's input and output stream, respectively.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+~~~java
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+DataStream<String> text = env.readTextFile(localFilePath);
+
+DataStream<Tuple2<String, Integer>> counts = text.transform(
+       "tokenizer", // operator name
+       TypeExtractor.getForObject(new Tuple2<String, Integer>("", 0)), // 
output type
+       new BoltWrapper<String, Tuple2<String, Integer>>(new BoltTokenizer())); 
// Bolt operator
+
+// do further processing
+[...]
+~~~
+</div>
+</div>
+
+### Named Attribute Access for Embedded Bolts
+
+Bolts can accesses input tuple fields via name (additionally to access via 
index).
+To use this feature with embedded Bolts, you need to have either a
+
+ 1. [POJO]({{site.baseurl}}/dev/api_concepts.html#pojos) type input stream or
+ 2. [Tuple]({{site.baseurl}}/dev/api_concepts.html#tuples-and-case-classes) 
type input stream and specify the input schema (i.e. name-to-index-mapping)
+
+For POJO input types, Flink accesses the fields via reflection.
+For this case, Flink expects either a corresponding public member variable or 
public getter method.
+For example, if a Bolt accesses a field via name `sentence` (eg, `String s = 
input.getStringByField("sentence");`), the input POJO class must have a member 
variable `public String sentence;` or method `public String getSentence() { ... 
};` (pay attention to camel-case naming).
+
+For `Tuple` input types, it is required to specify the input schema using 
Storm's `Fields` class.
+For this case, the constructor of `BoltWrapper` takes an additional argument: 
`new BoltWrapper<Tuple1<String>, ...>(..., new Fields("sentence"))`.
+The input type is `Tuple1<String>` and `Fields("sentence")` specify that 
`input.getStringByField("sentence")` is equivalent to `input.getString(0)`.
+
+See 
[BoltTokenizerWordCountPojo](https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojo.java)
 and 
[BoltTokenizerWordCountWithNames](https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNames.java)
 for examples.
+
+## Configuring Spouts and Bolts
+
+In Storm, Spouts and Bolts can be configured with a globally distributed `Map` 
object that is given to `submitTopology(...)` method of `LocalCluster` or 
`StormSubmitter`.
+This `Map` is provided by the user next to the topology and gets forwarded as 
a parameter to the calls `Spout.open(...)` and `Bolt.prepare(...)`.
+If a whole topology is executed in Flink using `FlinkTopologyBuilder` etc., 
there is no special attention required &ndash; it works as in regular Storm.
+
+For embedded usage, Flink's configuration mechanism must be used.
+A global configuration can be set in a `StreamExecutionEnvironment` via 
`.getConfig().setGlobalJobParameters(...)`.
+Flink's regular `Configuration` class can be used to configure Spouts and 
Bolts.
+However, `Configuration` does not support arbitrary key data types as Storm 
does (only `String` keys are allowed).
+Thus, Flink additionally provides `StormConfig` class that can be used like a 
raw `Map` to provide full compatibility to Storm.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+~~~java
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+StormConfig config = new StormConfig();
+// set config values
+[...]
+
+// set global Storm configuration
+env.getConfig().setGlobalJobParameters(config);
+
+// assemble program with embedded Spouts and/or Bolts
+[...]
+~~~
+</div>
+</div>
+
+## Multiple Output Streams
+
+Flink can also handle the declaration of multiple output streams for Spouts 
and Bolts.
+If a whole topology is executed in Flink using `FlinkTopologyBuilder` etc., 
there is no special attention required &ndash; it works as in regular Storm.
+
+For embedded usage, the output stream will be of data type 
`SplitStreamType<T>` and must be split by using `DataStream.split(...)` and 
`SplitStream.select(...)`.
+Flink provides the predefined output selector `StormStreamSelector<T>` for 
`.split(...)` already.
+Furthermore, the wrapper type `SplitStreamTuple<T>` can be removed using 
`SplitStreamMapper<T>`.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+~~~java
+[...]
+
+// get DataStream from Spout or Bolt which declares two output streams s1 and 
s2 with output type SomeType
+DataStream<SplitStreamType<SomeType>> multiStream = ...
+
+SplitStream<SplitStreamType<SomeType>> splitStream = multiStream.split(new 
StormStreamSelector<SomeType>());
+
+// remove SplitStreamType using SplitStreamMapper to get data stream of type 
SomeType
+DataStream<SomeType> s1 = splitStream.select("s1").map(new 
SplitStreamMapper<SomeType>()).returns(SomeType.class);
+DataStream<SomeType> s2 = splitStream.select("s2").map(new 
SplitStreamMapper<SomeType>()).returns(SomeType.class);
+
+// do further processing on s1 and s2
+[...]
+~~~
+</div>
+</div>
+
+See 
[SpoutSplitExample.java](https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/split/SpoutSplitExample.java)
 for a full example.
+
+# Flink Extensions
+
+## Finite Spouts
+
+In Flink, streaming sources can be finite, ie, emit a finite number of records 
and stop after emitting the last record. However, Spouts usually emit infinite 
streams.
+The bridge between the two approaches is the `FiniteSpout` interface which, in 
addition to `IRichSpout`, contains a `reachedEnd()` method, where the user can 
specify a stopping-condition.
+The user can create a finite Spout by implementing this interface instead of 
(or additionally to) `IRichSpout`, and implementing the `reachedEnd()` method 
in addition.
+In contrast to a `SpoutWrapper` that is configured to emit a finite number of 
tuples, `FiniteSpout` interface allows to implement more complex termination 
criteria.
+
+Although finite Spouts are not necessary to embed Spouts into a Flink 
streaming program or to submit a whole Storm topology to Flink, there are cases 
where they may come in handy:
+
+ * to achieve that a native Spout behaves the same way as a finite Flink 
source with minimal modifications
+ * the user wants to process a stream only for some time; after that, the 
Spout can stop automatically
+ * reading a file into a stream
+ * for testing purposes
+
+An example of a finite Spout that emits records for 10 seconds only:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+~~~java
+public class TimedFiniteSpout extends BaseRichSpout implements FiniteSpout {
+       [...] // implement open(), nextTuple(), ...
+
+       private long starttime = System.currentTimeMillis();
+
+       public boolean reachedEnd() {
+               return System.currentTimeMillis() - starttime > 10000l;
+       }
+}
+~~~
+</div>
+</div>
+
+# Storm Compatibility Examples
+
+You can find more examples in Maven module `flink-storm-examples`.
+For the different versions of WordCount, see 
[README.md](https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-examples/README.md).
+To run the examples, you need to assemble a correct jar file.
+`flink-storm-examples-{{ site.version }}.jar` is **no** valid jar file for job 
execution (it is only a standard maven artifact).
+
+There are example jars for embedded Spout and Bolt, namely 
`WordCount-SpoutSource.jar` and `WordCount-BoltTokenizer.jar`, respectively.
+Compare `pom.xml` to see how both jars are built.
+Furthermore, there is one example for whole Storm topologies 
(`WordCount-StormTopology.jar`).
+
+You can run each of those examples via `bin/flink run <jarname>.jar`. The 
correct entry point class is contained in each jar's manifest file.

http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/dev/local_execution.md
----------------------------------------------------------------------
diff --git a/docs/dev/local_execution.md b/docs/dev/local_execution.md
new file mode 100644
index 0000000..a348951
--- /dev/null
+++ b/docs/dev/local_execution.md
@@ -0,0 +1,125 @@
+---
+title:  "Local Execution"
+nav-parent_id: dev
+nav-pos: 11
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+Flink can run on a single machine, even in a single Java Virtual Machine. This 
allows users to test and debug Flink programs locally. This section gives an 
overview of the local execution mechanisms.
+
+The local environments and executors allow you to run Flink programs in a 
local Java Virtual Machine, or with within any JVM as part of existing 
programs. Most examples can be launched locally by simply hitting the "Run" 
button of your IDE.
+
+There are two different kinds of local execution supported in Flink. The 
`LocalExecutionEnvironment` is starting the full Flink runtime, including a 
JobManager and a TaskManager. These include memory management and all the 
internal algorithms that are executed in the cluster mode.
+
+The `CollectionEnvironment` is executing the Flink program on Java 
collections. This mode will not start the full Flink runtime, so the execution 
is very low-overhead and lightweight. For example a 
`DataSet.map()`-transformation will be executed by applying the `map()` 
function to all elements in a Java list.
+
+* TOC
+{:toc}
+
+
+## Debugging
+
+If you are running Flink programs locally, you can also debug your program 
like any other Java program. You can either use `System.out.println()` to write 
out some internal variables or you can use the debugger. It is possible to set 
breakpoints within `map()`, `reduce()` and all the other methods.
+Please also refer to the [debugging section]({{ site.baseurl 
}}/dev/batch/index.html#debugging) in the Java API documentation for a guide to 
testing and local debugging utilities in the Java API.
+
+## Maven Dependency
+
+If you are developing your program in a Maven project, you have to add the 
`flink-clients` module using this dependency:
+
+~~~xml
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-clients{{ site.scala_version_suffix }}</artifactId>
+  <version>{{site.version}}</version>
+</dependency>
+~~~
+
+## Local Environment
+
+The `LocalEnvironment` is a handle to local execution for Flink programs. Use 
it to run a program within a local JVM - standalone or embedded in other 
programs.
+
+The local environment is instantiated via the method 
`ExecutionEnvironment.createLocalEnvironment()`. By default, it will use as 
many local threads for execution as your machine has CPU cores (hardware 
contexts). You can alternatively specify the desired parallelism. The local 
environment can be configured to log to the console using 
`enableLogging()`/`disableLogging()`.
+
+In most cases, calling `ExecutionEnvironment.getExecutionEnvironment()` is the 
even better way to go. That method returns a `LocalEnvironment` when the 
program is started locally (outside the command line interface), and it returns 
a pre-configured environment for cluster execution, when the program is invoked 
by the [command line interface]({{ site.baseurl }}/setup/cli.html).
+
+~~~java
+public static void main(String[] args) throws Exception {
+    ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+
+    DataSet<String> data = env.readTextFile("file:///path/to/file");
+
+    data
+        .filter(new FilterFunction<String>() {
+            public boolean filter(String value) {
+                return value.startsWith("http://";);
+            }
+        })
+        .writeAsText("file:///path/to/result");
+
+    JobExecutionResult res = env.execute();
+}
+~~~
+
+The `JobExecutionResult` object, which is returned after the execution 
finished, contains the program runtime and the accumulator results.
+
+The `LocalEnvironment` allows also to pass custom configuration values to 
Flink.
+
+~~~java
+Configuration conf = new Configuration();
+conf.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, 0.5f);
+final ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment(conf);
+~~~
+
+*Note:* The local execution environments do not start any web frontend to 
monitor the execution.
+
+## Collection Environment
+
+The execution on Java Collections using the `CollectionEnvironment` is a 
low-overhead approach for executing Flink programs. Typical use-cases for this 
mode are automated tests, debugging and code re-use.
+
+Users can use algorithms implemented for batch processing also for cases that 
are more interactive. A slightly changed variant of a Flink program could be 
used in a Java Application Server for processing incoming requests.
+
+**Skeleton for Collection-based execution**
+
+~~~java
+public static void main(String[] args) throws Exception {
+    // initialize a new Collection-based execution environment
+    final ExecutionEnvironment env = new CollectionEnvironment();
+
+    DataSet<User> users = env.fromCollection( /* get elements from a Java 
Collection */);
+
+    /* Data Set transformations ... */
+
+    // retrieve the resulting Tuple2 elements into a ArrayList.
+    Collection<...> result = new ArrayList<...>();
+    resultDataSet.output(new LocalCollectionOutputFormat<...>(result));
+
+    // kick off execution.
+    env.execute();
+
+    // Do some work with the resulting ArrayList (=Collection).
+    for(... t : result) {
+        System.err.println("Result = "+t);
+    }
+}
+~~~
+
+The `flink-examples-batch` module contains a full example, called 
`CollectionExecutionExample`.
+
+Please note that the execution of the collection-based Flink programs is only 
possible on small data, which fits into the JVM heap. The execution on 
collections is not multi-threaded, only one thread is used.

http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/dev/quickstarts.md
----------------------------------------------------------------------
diff --git a/docs/dev/quickstarts.md b/docs/dev/quickstarts.md
new file mode 100644
index 0000000..ef21ca6
--- /dev/null
+++ b/docs/dev/quickstarts.md
@@ -0,0 +1,24 @@
+---
+title: "Quickstarts"
+nav-id: quickstarts
+nav-parent_id: dev
+nav-pos: 1
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->

http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/dev/scala_api_extensions.md
----------------------------------------------------------------------
diff --git a/docs/dev/scala_api_extensions.md b/docs/dev/scala_api_extensions.md
new file mode 100644
index 0000000..ffa6145
--- /dev/null
+++ b/docs/dev/scala_api_extensions.md
@@ -0,0 +1,408 @@
+---
+title: "Scala API Extensions"
+nav-parent_id: apis
+nav-pos: 104
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+In order to keep a fair amount of consistency between the Scala and Java APIs, 
some
+of the features that allow a high-level of expressiveness in Scala have been 
left
+out from the standard APIs for both batch and streaming.
+
+If you want to _enjoy the full Scala experience_ you can choose to opt-in to
+extensions that enhance the Scala API via implicit conversions.
+
+To use all the available extensions, you can just add a simple `import` for the
+DataSet API
+
+{% highlight scala %}
+import org.apache.flink.api.scala.extensions._
+{% endhighlight %}
+
+or the DataStream API
+
+{% highlight scala %}
+import org.apache.flink.streaming.api.scala.extensions._
+{% endhighlight %}
+
+Alternatively, you can import individual extensions _a-là-carte_ to only use 
those
+you prefer.
+
+## Accept partial functions
+
+Normally, both the DataSet and DataStream APIs don't accept anonymous pattern
+matching functions to deconstruct tuples, case classes or collections, like the
+following:
+
+{% highlight scala %}
+val data: DataSet[(Int, String, Double)] = // [...]
+data.map {
+  case (id, name, temperature) => // [...]
+  // The previous line causes the following compilation error:
+  // "The argument types of an anonymous function must be fully known. (SLS 
8.5)"
+}
+{% endhighlight %}
+
+This extension introduces new methods in both the DataSet and DataStream Scala 
API
+that have a one-to-one correspondance in the extended API. These delegating 
methods
+do support anonymous pattern matching functions.
+
+#### DataSet API
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 20%">Method</th>
+      <th class="text-left" style="width: 20%">Original</th>
+      <th class="text-center">Example</th>
+    </tr>
+  </thead>
+
+  <tbody>
+    <tr>
+      <td><strong>mapWith</strong></td>
+      <td><strong>map (DataSet)</strong></td>
+      <td>
+{% highlight scala %}
+data.mapWith {
+  case (_, value) => value.toString
+}
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td><strong>mapPartitionWith</strong></td>
+      <td><strong>mapPartition (DataSet)</strong></td>
+      <td>
+{% highlight scala %}
+data.mapPartitionWith {
+  case head #:: _ => head
+}
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td><strong>flatMapWith</strong></td>
+      <td><strong>flatMap (DataSet)</strong></td>
+      <td>
+{% highlight scala %}
+data.flatMapWith {
+  case (_, name, visitTimes) => visitTimes.map(name -> _)
+}
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td><strong>filterWith</strong></td>
+      <td><strong>filter (DataSet)</strong></td>
+      <td>
+{% highlight scala %}
+data.filterWith {
+  case Train(_, isOnTime) => isOnTime
+}
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td><strong>reduceWith</strong></td>
+      <td><strong>reduce (DataSet, GroupedDataSet)</strong></td>
+      <td>
+{% highlight scala %}
+data.reduceWith {
+  case ((_, amount1), (_, amount2)) => amount1 + amount2
+}
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td><strong>reduceGroupWith</strong></td>
+      <td><strong>reduceGroup (GroupedDataSet)</strong></td>
+      <td>
+{% highlight scala %}
+data.reduceGroupWith {
+  case id #:: value #:: _ => id -> value
+}
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td><strong>groupingBy</strong></td>
+      <td><strong>groupBy (DataSet)</strong></td>
+      <td>
+{% highlight scala %}
+data.groupingBy {
+  case (id, _, _) => id
+}
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td><strong>sortGroupWith</strong></td>
+      <td><strong>sortGroup (GroupedDataSet)</strong></td>
+      <td>
+{% highlight scala %}
+grouped.sortGroupWith(Order.ASCENDING) {
+  case House(_, value) => value
+}
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td><strong>combineGroupWith</strong></td>
+      <td><strong>combineGroup (GroupedDataSet)</strong></td>
+      <td>
+{% highlight scala %}
+grouped.combineGroupWith {
+  case header #:: amounts => amounts.sum
+}
+{% endhighlight %}
+      </td>
+    <tr>
+      <td><strong>projecting</strong></td>
+      <td><strong>apply (JoinDataSet, CrossDataSet)</strong></td>
+      <td>
+{% highlight scala %}
+data1.join(data2).
+  whereClause(case (pk, _) => pk).
+  isEqualTo(case (_, fk) => fk).
+  projecting {
+    case ((pk, tx), (products, fk)) => tx -> products
+  }
+
+data1.cross(data2).projecting {
+  case ((a, _), (_, b) => a -> b
+}
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td><strong>projecting</strong></td>
+      <td><strong>apply (CoGroupDataSet)</strong></td>
+      <td>
+{% highlight scala %}
+data1.coGroup(data2).
+  whereClause(case (pk, _) => pk).
+  isEqualTo(case (_, fk) => fk).
+  projecting {
+    case (head1 #:: _, head2 #:: _) => head1 -> head2
+  }
+}
+{% endhighlight %}
+      </td>
+    </tr>
+    </tr>
+  </tbody>
+</table>
+
+#### DataStream API
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 20%">Method</th>
+      <th class="text-left" style="width: 20%">Original</th>
+      <th class="text-center">Example</th>
+    </tr>
+  </thead>
+
+  <tbody>
+    <tr>
+      <td><strong>mapWith</strong></td>
+      <td><strong>map (DataStream)</strong></td>
+      <td>
+{% highlight scala %}
+data.mapWith {
+  case (_, value) => value.toString
+}
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td><strong>mapPartitionWith</strong></td>
+      <td><strong>mapPartition (DataStream)</strong></td>
+      <td>
+{% highlight scala %}
+data.mapPartitionWith {
+  case head #:: _ => head
+}
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td><strong>flatMapWith</strong></td>
+      <td><strong>flatMap (DataStream)</strong></td>
+      <td>
+{% highlight scala %}
+data.flatMapWith {
+  case (_, name, visits) => visits.map(name -> _)
+}
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td><strong>filterWith</strong></td>
+      <td><strong>filter (DataStream)</strong></td>
+      <td>
+{% highlight scala %}
+data.filterWith {
+  case Train(_, isOnTime) => isOnTime
+}
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td><strong>keyingBy</strong></td>
+      <td><strong>keyBy (DataStream)</strong></td>
+      <td>
+{% highlight scala %}
+data.keyingBy {
+  case (id, _, _) => id
+}
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td><strong>mapWith</strong></td>
+      <td><strong>map (ConnectedDataStream)</strong></td>
+      <td>
+{% highlight scala %}
+data.mapWith(
+  map1 = case (_, value) => value.toString,
+  map2 = case (_, _, value, _) => value + 1
+)
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td><strong>flatMapWith</strong></td>
+      <td><strong>flatMap (ConnectedDataStream)</strong></td>
+      <td>
+{% highlight scala %}
+data.flatMapWith(
+  flatMap1 = case (_, json) => parse(json),
+  flatMap2 = case (_, _, json, _) => parse(json)
+)
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td><strong>keyingBy</strong></td>
+      <td><strong>keyBy (ConnectedDataStream)</strong></td>
+      <td>
+{% highlight scala %}
+data.keyingBy(
+  key1 = case (_, timestamp) => timestamp,
+  key2 = case (id, _, _) => id
+)
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td><strong>reduceWith</strong></td>
+      <td><strong>reduce (KeyedDataStream, WindowedDataStream)</strong></td>
+      <td>
+{% highlight scala %}
+data.reduceWith {
+  case ((_, sum1), (_, sum2) => sum1 + sum2
+}
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td><strong>foldWith</strong></td>
+      <td><strong>fold (KeyedDataStream, WindowedDataStream)</strong></td>
+      <td>
+{% highlight scala %}
+data.foldWith(User(bought = 0)) {
+  case (User(b), (_, items)) => User(b + items.size)
+}
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td><strong>applyWith</strong></td>
+      <td><strong>apply (WindowedDataStream)</strong></td>
+      <td>
+{% highlight scala %}
+data.applyWith(0)(
+  foldFunction = case (sum, amount) => sum + amount
+  windowFunction = case (k, w, sum) => // [...]
+)
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td><strong>projecting</strong></td>
+      <td><strong>apply (JoinedDataStream)</strong></td>
+      <td>
+{% highlight scala %}
+data1.join(data2).
+  whereClause(case (pk, _) => pk).
+  isEqualTo(case (_, fk) => fk).
+  projecting {
+    case ((pk, tx), (products, fk)) => tx -> products
+  }
+{% endhighlight %}
+      </td>
+    </tr>
+  </tbody>
+</table>
+
+
+
+For more information on the semantics of each method, please refer to the
+[DataSet]({{ site.baseurl }}/dev/batch/index.html) and [DataStream]({{ 
site.baseurl }}/dev/datastream_api.html) API documentation.
+
+To use this extension exclusively, you can add the following `import`:
+
+{% highlight scala %}
+import org.apache.flink.api.scala.extensions.acceptPartialFunctions
+{% endhighlight %}
+
+for the DataSet extensions and
+
+{% highlight scala %}
+import org.apache.flink.streaming.api.scala.extensions.acceptPartialFunctions
+{% endhighlight %}
+
+The following snippet shows a minimal example of how to use these extension
+methods together (with the DataSet API):
+
+{% highlight scala %}
+object Main {
+  import org.apache.flink.api.scala.extensions._
+  case class Point(x: Double, y: Double)
+  def main(args: Array[String]): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = env.fromElements(Point(1, 2), Point(3, 4), Point(5, 6))
+    ds.filterWith {
+      case Point(x, _) => x > 1
+    }.reduceWith {
+      case (Point(x1, y1), (Point(x2, y2))) => Point(x1 + y1, x2 + y2)
+    }.mapWith {
+      case Point(x, y) => (x, y)
+    }.flatMapWith {
+      case (x, y) => Seq("x" -> x, "y" -> y)
+    }.groupingBy {
+      case (id, value) => id
+    }
+  }
+}
+{% endhighlight %}

http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/dev/scala_shell.md
----------------------------------------------------------------------
diff --git a/docs/dev/scala_shell.md b/docs/dev/scala_shell.md
new file mode 100644
index 0000000..0728812
--- /dev/null
+++ b/docs/dev/scala_shell.md
@@ -0,0 +1,193 @@
+---
+title: "Scala Shell"
+nav-parent_id: dev
+nav-pos: 10
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+Flink comes with an integrated interactive Scala Shell.
+It can be used in a local setup as well as in a cluster setup.
+
+To use the shell with an integrated Flink cluster just execute:
+
+~~~bash
+bin/start-scala-shell.sh local
+~~~
+
+in the root directory of your binary Flink directory. To run the Shell on a
+cluster, please see the Setup section below.
+
+## Usage
+
+The shell supports Batch and Streaming.
+Two different ExecutionEnvironments are automatically prebound after startup.
+Use "benv" and "senv" to access the Batch and Streaming environment 
respectively.
+
+### DataSet API
+
+The following example will execute the wordcount program in the Scala shell:
+
+~~~scala
+Scala-Flink> val text = benv.fromElements(
+  "To be, or not to be,--that is the question:--",
+  "Whether 'tis nobler in the mind to suffer",
+  "The slings and arrows of outrageous fortune",
+  "Or to take arms against a sea of troubles,")
+Scala-Flink> val counts = text
+    .flatMap { _.toLowerCase.split("\\W+") }
+    .map { (_, 1) }.groupBy(0).sum(1)
+Scala-Flink> counts.print()
+~~~
+
+The print() command will automatically send the specified tasks to the 
JobManager for execution and will show the result of the computation in the 
terminal.
+
+It is possible to write results to a file. However, in this case you need to 
call `execute`, to run your program:
+
+~~~scala
+Scala-Flink> benv.execute("MyProgram")
+~~~
+
+### DataStream API
+
+Similar to the the batch program above, we can execute a streaming program 
through the DataStream API:
+
+~~~scala
+Scala-Flink> val textStreaming = senv.fromElements(
+  "To be, or not to be,--that is the question:--",
+  "Whether 'tis nobler in the mind to suffer",
+  "The slings and arrows of outrageous fortune",
+  "Or to take arms against a sea of troubles,")
+Scala-Flink> val countsStreaming = textStreaming
+    .flatMap { _.toLowerCase.split("\\W+") }
+    .map { (_, 1) }.keyBy(0).sum(1)
+Scala-Flink> countsStreaming.print()
+Scala-Flink> senv.execute("Streaming Wordcount")
+~~~
+
+Note, that in the Streaming case, the print operation does not trigger 
execution directly.
+
+The Flink Shell comes with command history and auto-completion.
+
+
+## Adding external dependencies
+
+It is possible to add external classpaths to the Scala-shell. These will be 
sent to the Jobmanager automatically alongside your shell program, when calling 
execute.
+
+Use the parameter `-a <path/to/jar.jar>` or `--addclasspath <path/to/jar.jar>` 
to load additional classes.
+
+~~~bash
+bin/start-scala-shell.sh [local | remote <host> <port> | yarn] --addclasspath 
<path/to/jar.jar>
+~~~
+
+
+## Setup
+
+To get an overview of what options the Scala Shell provides, please use
+
+~~~bash
+bin/start-scala-shell.sh --help
+~~~
+
+### Local
+
+To use the shell with an integrated Flink cluster just execute:
+
+~~~bash
+bin/start-scala-shell.sh local
+~~~
+
+
+### Remote
+
+To use it with a running cluster start the scala shell with the keyword 
`remote`
+and supply the host and port of the JobManager with:
+
+~~~bash
+bin/start-scala-shell.sh remote <hostname> <portnumber>
+~~~
+
+### Yarn Scala Shell cluster
+
+The shell can deploy a Flink cluster to YARN, which is used exclusively by the
+shell. The number of YARN containers can be controlled by the parameter `-n 
<arg>`.
+The shell deploys a new Flink cluster on YARN and connects the
+cluster. You can also specify options for YARN cluster such as memory for
+JobManager, name of YARN application, etc.
+
+For example, to start a Yarn cluster for the Scala Shell with two TaskManagers
+use the following:
+
+~~~bash
+ bin/start-scala-shell.sh yarn -n 2
+~~~
+
+For all other options, see the full reference at the bottom.
+
+
+### Yarn Session
+
+If you have previously deployed a Flink cluster using the Flink Yarn Session,
+the Scala shell can connect with it using the following command:
+
+~~~bash
+ bin/start-scala-shell.sh yarn
+~~~
+
+
+## Full Reference
+
+~~~bash
+Flink Scala Shell
+Usage: start-scala-shell.sh [local|remote|yarn] [options] <args>...
+
+Command: local [options]
+Starts Flink scala shell with a local Flink cluster
+  -a <path/to/jar> | --addclasspath <path/to/jar>
+        Specifies additional jars to be used in Flink
+Command: remote [options] <host> <port>
+Starts Flink scala shell connecting to a remote cluster
+  <host>
+        Remote host name as string
+  <port>
+        Remote port as integer
+
+  -a <path/to/jar> | --addclasspath <path/to/jar>
+        Specifies additional jars to be used in Flink
+Command: yarn [options]
+Starts Flink scala shell connecting to a yarn cluster
+  -n arg | --container arg
+        Number of YARN container to allocate (= Number of TaskManagers)
+  -jm arg | --jobManagerMemory arg
+        Memory for JobManager container [in MB]
+  -nm <value> | --name <value>
+        Set a custom name for the application on YARN
+  -qu <arg> | --queue <arg>
+        Specifies YARN queue
+  -s <arg> | --slots <arg>
+        Number of slots per TaskManager
+  -tm <arg> | --taskManagerMemory <arg>
+        Memory per TaskManager container [in MB]
+  -a <path/to/jar> | --addclasspath <path/to/jar>
+        Specifies additional jars to be used in Flink
+  --configDir <value>
+        The configuration directory.
+  -h | --help
+        Prints this usage text
+~~~

http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/dev/state.md
----------------------------------------------------------------------
diff --git a/docs/dev/state.md b/docs/dev/state.md
new file mode 100644
index 0000000..ec8c5eb
--- /dev/null
+++ b/docs/dev/state.md
@@ -0,0 +1,293 @@
+---
+title: "Working with State"
+nav-parent_id: dev
+nav-pos: 3
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+All transformations in Flink may look like functions (in the functional 
processing terminology), but
+are in fact stateful operators. You can make *every* transformation (`map`, 
`filter`, etc) stateful
+by using Flink's state interface or checkpointing instance fields of your 
function. You can register
+any instance field
+as ***managed*** state by implementing an interface. In this case, and also in 
the case of using
+Flink's native state interface, Flink will automatically take consistent 
snapshots of your state
+periodically, and restore its value in the case of a failure.
+
+The end effect is that updates to any form of state are the same under 
failure-free execution and
+execution under failures.
+
+First, we look at how to make instance fields consistent under failures, and 
then we look at
+Flink's state interface.
+
+By default state checkpoints will be stored in-memory at the JobManager. For 
proper persistence of large
+state, Flink supports storing the checkpoints on file systems (HDFS, S3, or 
any mounted POSIX file system),
+which can be configured in the `flink-conf.yaml` or via 
`StreamExecutionEnvironment.setStateBackend(…)`.
+See [state backends]({{ site.baseurl }}/dev/state_backends.html) for 
information
+about the available state backends and how to configure them.
+
+* ToC
+{:toc}
+
+## Using the Key/Value State Interface
+
+The Key/Value state interface provides access to different types of state that 
are all scoped to
+the key of the current input element. This means that this type of state can 
only be used
+on a `KeyedStream`, which can be created via `stream.keyBy(…)`.
+
+Now, we will first look at the different types of state available and then we 
will see
+how they can be used in a program. The available state primitives are:
+
+* `ValueState<T>`: This keeps a value that can be updated and
+retrieved (scoped to key of the input element, mentioned above, so there will 
possibly be one value
+for each key that the operation sees). The value can be set using `update(T)` 
and retrieved using
+`T value()`.
+
+* `ListState<T>`: This keeps a list of elements. You can append elements and 
retrieve an `Iterable`
+over all currently stored elements. Elements are added using `add(T)`, the 
Iterable can
+be retrieved using `Iterable<T> get()`.
+
+* `ReducingState<T>`: This keeps a single value that represents the 
aggregation of all values
+added to the state. The interface is the same as for `ListState` but elements 
added using
+`add(T)` are reduced to an aggregate using a specified `ReduceFunction`.
+
+All types of state also have a method `clear()` that clears the state for the 
currently
+active key (i.e. the key of the input element).
+
+It is important to keep in mind that these state objects are only used for 
interfacing
+with state. The state is not necessarily stored inside but might reside on 
disk or somewhere else.
+The second thing to keep in mind is that the value you get from the state
+depend on the key of the input element. So the value you get in one invocation 
of your
+user function can be different from the one you get in another invocation if 
the key of
+the element is different.
+
+To get a state handle you have to create a `StateDescriptor` this holds the 
name of the state
+(as we will later see you can create several states, and they have to have 
unique names so
+that you can reference them), the type of the values that the state holds and 
possibly
+a user-specified function, such as a `ReduceFunction`. Depending on what type 
of state you
+want to retrieve you create one of `ValueStateDescriptor`, 
`ListStateDescriptor` or
+`ReducingStateDescriptor`.
+
+State is accessed using the `RuntimeContext`, so it is only possible in *rich 
functions*.
+Please see [here]({{ site.baseurl 
}}/apis/common/#specifying-transformation-functions) for
+information about that but we will also see an example shortly. The 
`RuntimeContext` that
+is available in a `RichFunction` has these methods for accessing state:
+
+* `ValueState<T> getState(ValueStateDescriptor<T>)`
+* `ReducingState<T> getReducingState(ReducingStateDescriptor<T>)`
+* `ListState<T> getListState(ListStateDescriptor<T>)`
+
+This is an example `FlatMapFunction` that shows how all of the parts fit 
together:
+
+{% highlight java %}
+public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, 
Long>, Tuple2<Long, Long>> {
+
+    /**
+     * The ValueState handle. The first field is the count, the second field a 
running sum.
+     */
+    private transient ValueState<Tuple2<Long, Long>> sum;
+
+    @Override
+    public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, 
Long>> out) throws Exception {
+
+        // access the state value
+        Tuple2<Long, Long> currentSum = sum.value();
+
+        // update the count
+        currentSum.f0 += 1;
+
+        // add the second field of the input value
+        currentSum.f1 += input.f1;
+
+        // update the state
+        sum.update(currentSum);
+
+        // if the count reaches 2, emit the average and clear the state
+        if (currentSum.f0 >= 2) {
+            out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
+            sum.clear();
+        }
+    }
+
+    @Override
+    public void open(Configuration config) {
+        ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
+                new ValueStateDescriptor<>(
+                        "average", // the state name
+                        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() 
{}), // type information
+                        Tuple2.of(0L, 0L)); // default value of the state, if 
nothing was set
+        sum = getRuntimeContext().getState(descriptor);
+    }
+}
+
+// this can be used in a streaming program like this (assuming we have a 
StreamExecutionEnvironment env)
+env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), 
Tuple2.of(1L, 4L), Tuple2.of(1L, 2L))
+        .keyBy(0)
+        .flatMap(new CountWindowAverage())
+        .print();
+
+// the printed output will be (1,4) and (1,5)
+{% endhighlight %}
+
+This example implements a poor man's counting window. We key the tuples by the 
first field
+(in the example all have the same key `1`). The function stores the count and 
a running sum in
+a `ValueState`, once the count reaches 2 it will emit the average and clear 
the state so that
+we start over from `0`. Note that this would keep a different state value for 
each different input
+key if we had tuples with different values in the first field.
+
+### State in the Scala DataStream API
+
+In addition to the interface described above, the Scala API has shortcuts for 
stateful
+`map()` or `flatMap()` functions with a single `ValueState` on `KeyedStream`. 
The user function
+gets the current value of the `ValueState` in an `Option` and must return an 
updated value that
+will be used to update the state.
+
+{% highlight scala %}
+val stream: DataStream[(String, Int)] = ...
+
+val counts: DataStream[(String, Int)] = stream
+  .keyBy(_._1)
+  .mapWithState((in: (String, Int), count: Option[Int]) =>
+    count match {
+      case Some(c) => ( (in._1, c), Some(c + in._2) )
+      case None => ( (in._1, 0), Some(in._2) )
+    })
+{% endhighlight %}
+
+## Checkpointing Instance Fields
+
+Instance fields can be checkpointed by using the `Checkpointed` interface.
+
+When the user-defined function implements the `Checkpointed` interface, the 
`snapshotState(…)` and `restoreState(…)`
+methods will be executed to draw and restore function state.
+
+In addition to that, user functions can also implement the 
`CheckpointNotifier` interface to receive notifications on
+completed checkpoints via the `notifyCheckpointComplete(long checkpointId)` 
method.
+Note that there is no guarantee for the user function to receive a 
notification if a failure happens between
+checkpoint completion and notification. The notifications should hence be 
treated in a way that notifications from
+later checkpoints can subsume missing notifications.
+
+The above example for `ValueState` can be implemented using instance fields 
like this:
+
+{% highlight java %}
+
+public class CountWindowAverage
+        extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>
+        implements Checkpointed<Tuple2<Long, Long>> {
+
+    private Tuple2<Long, Long> sum = null;
+
+    @Override
+    public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, 
Long>> out) throws Exception {
+
+        // update the count
+        sum.f0 += 1;
+
+        // add the second field of the input value
+        sum.f1 += input.f1;
+
+
+        // if the count reaches 2, emit the average and clear the state
+        if (sum.f0 >= 2) {
+            out.collect(new Tuple2<>(input.f0, sum.f1 / sum.f0));
+            sum = Tuple2.of(0L, 0L);
+        }
+    }
+
+    @Override
+    public void open(Configuration config) {
+        if (sum == null) {
+            // only recreate if null
+            // restoreState will be called before open()
+            // so this will already set the sum to the restored value
+            sum = Tuple2.of(0L, 0L);
+        }
+    }
+
+    // regularly persists state during normal operation
+    @Override
+    public Serializable snapshotState(long checkpointId, long 
checkpointTimestamp) {
+        return sum;
+    }
+
+    // restores state on recovery from failure
+    @Override
+    public void restoreState(Tuple2<Long, Long> state) {
+        sum = state;
+    }
+}
+{% endhighlight %}
+
+## Stateful Source Functions
+
+Stateful sources require a bit more care as opposed to other operators.
+In order to make the updates to the state and output collection atomic 
(required for exactly-once semantics
+on failure/recovery), the user is required to get a lock from the source's 
context.
+
+{% highlight java %}
+public static class CounterSource
+        extends RichParallelSourceFunction<Long>
+        implements Checkpointed<Long> {
+
+    /**  current offset for exactly once semantics */
+    private long offset;
+
+    /** flag for job cancellation */
+    private volatile boolean isRunning = true;
+
+    @Override
+    public void run(SourceContext<Long> ctx) {
+        final Object lock = ctx.getCheckpointLock();
+
+        while (isRunning) {
+            // output and state update are atomic
+            synchronized (lock) {
+                ctx.collect(offset);
+                offset += 1;
+            }
+        }
+    }
+
+    @Override
+    public void cancel() {
+        isRunning = false;
+    }
+
+    @Override
+    public Long snapshotState(long checkpointId, long checkpointTimestamp) {
+        return offset;
+
+    }
+
+    @Override
+       public void restoreState(Long state) {
+        offset = state;
+    }
+}
+{% endhighlight %}
+
+Some operators might need the information when a checkpoint is fully 
acknowledged by Flink to communicate that with the outside world. In this case 
see the `flink.streaming.api.checkpoint.CheckpointNotifier` interface.
+
+## State Checkpoints in Iterative Jobs
+
+Flink currently only provides processing guarantees for jobs without 
iterations. Enabling checkpointing on an iterative job causes an exception. In 
order to force checkpointing on an iterative program the user needs to set a 
special flag when enabling checkpointing: `env.enableCheckpointing(interval, 
force = true)`.
+
+Please note that records in flight in the loop edges (and the state changes 
associated with them) will be lost during failure.
+
+{% top %}

http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/dev/state_backends.md
----------------------------------------------------------------------
diff --git a/docs/dev/state_backends.md b/docs/dev/state_backends.md
new file mode 100644
index 0000000..e5b9c2a
--- /dev/null
+++ b/docs/dev/state_backends.md
@@ -0,0 +1,162 @@
+---
+title: "State Backends"
+nav-parent_id: dev
+nav-pos: 5
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+Programs written in the [Data Stream API]({{ site.baseurl 
}}/dev/datastream_api.html) often hold state in various forms:
+
+- Windows gather elements or aggregates until they are triggered
+- Transformation functions may use the key/value state interface to store 
values
+- Transformation functions may implement the `Checkpointed` interface to make 
their local variables fault tolerant
+
+See also [Working with State]({{ site.baseurl }}/dev/state.html) in the 
streaming API guide.
+
+When checkpointing is activated, such state is persisted upon checkpoints to 
guard against data loss and recover consistently.
+How the state is represented internally, and how and where it is persisted 
upon checkpoints depends on the
+chosen **State Backend**.
+
+* ToC
+{:toc}
+
+## Available State Backends
+
+Out of the box, Flink bundles these state backends:
+
+ - *MemoryStateBacked*
+ - *FsStateBackend*
+ - *RocksDBStateBackend*
+
+If nothing else is configured, the system will use the MemoryStateBacked.
+
+
+### The MemoryStateBackend
+
+The *MemoryStateBacked* holds data internally as objects on the Java heap. 
Key/value state and window operators hold hash tables
+that store the values, triggers, etc.
+
+Upon checkpoints, this state backend will snapshot the state and send it as 
part of the checkpoint acknowledgement messages to the
+JobManager (master), which stores it on its heap as well.
+
+Limitations of the MemoryStateBackend:
+
+  - The size of each individual state is by default limited to 5 MB. This 
value can be increased in the constructor of the MemoryStateBackend.
+  - Irrespective of the configured maximal state size, the state cannot be 
larger than the akka frame size (see [Configuration]({{ site.baseurl 
}}/setup/config.html)).
+  - The aggregate state must fit into the JobManager memory.
+
+The MemoryStateBackend is encouraged for:
+
+  - Local development and debugging
+  - Jobs that do hold little state, such as jobs that consist only of 
record-at-a-time functions (Map, FlatMap, Filter, ...). The Kafka Consumer 
requires very little state.
+
+
+### The FsStateBackend
+
+The *FsStateBackend* is configured with a file system URL (type, address, 
path), such as "hdfs://namenode:40010/flink/checkpoints" or 
"file:///data/flink/checkpoints".
+
+The FsStateBackend holds in-flight data in the TaskManager's memory. Upon 
checkpointing, it writes state snapshots into files in the configured file 
system and directory. Minimal metadata is stored in the JobManager's memory 
(or, in high-availability mode, in the metadata checkpoint).
+
+The FsStateBackend is encouraged for:
+
+  - Jobs with large state, long windows, large key/value states.
+  - All high-availability setups.
+
+### The RocksDBStateBackend
+
+The *RocksDBStateBackend* is configured with a file system URL (type, address, 
path), such as "hdfs://namenode:40010/flink/checkpoints" or 
"file:///data/flink/checkpoints".
+
+The RocksDBStateBackend holds in-flight data in a 
[RocksDB](http://rocksdb.org) data base
+that is (per default) stored in the TaskManager data directories. Upon 
checkpointing, the whole
+RocksDB data base will be checkpointed into the configured file system and 
directory. Minimal
+metadata is stored in the JobManager's memory (or, in high-availability mode, 
in the metadata checkpoint).
+
+The RocksDBStateBackend is encouraged for:
+
+  - Jobs with very large state, long windows, large key/value states.
+  - All high-availability setups.
+
+Note that the amount of state that you can keep is only limited by the amount 
of disc space available.
+This allows keeping very large state, compared to the FsStateBackend that 
keeps state in memory.
+This also means, however, that the maximum throughput that can be achieved 
will be lower with
+this state backend.
+
+**NOTE:** To use the RocksDBStateBackend you also have to add the correct 
maven dependency to your
+project:
+
+{% highlight xml %}
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-statebackend-rocksdb{{ site.scala_version_suffix 
}}</artifactId>
+  <version>{{site.version }}</version>
+</dependency>
+{% endhighlight %}
+
+The backend is currently not part of the binary distribution. See
+[here]({{ 
site.baseurl}}/dev/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution)
+for an explanation of how to include it for cluster execution.
+
+## Configuring a State Backend
+
+State backends can be configured per job. In addition, you can define a 
default state backend to be used when the
+job does not explicitly define a state backend.
+
+
+### Setting the Per-job State Backend
+
+The per-job state backend is set on the `StreamExecutionEnvironment` of the 
job, as shown in the example below:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+env.setStateBackend(new 
FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+env.setStateBackend(new 
FsStateBackend("hdfs://namenode:40010/flink/checkpoints"))
+{% endhighlight %}
+</div>
+</div>
+
+
+### Setting Default State Backend
+
+A default state backend can be configured in the `flink-conf.yaml`, using the 
configuration key `state.backend`.
+
+Possible values for the config entry are *jobmanager* (MemoryStateBackend), 
*filesystem* (FsStateBackend), or the fully qualified class
+name of the class that implements the state backend factory 
[FsStateBackendFactory](https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackendFactory.java).
+
+In the case where the default state backend is set to *filesystem*, the entry 
`state.backend.fs.checkpointdir` defines the directory where the checkpoint 
data will be stored.
+
+A sample section in the configuration file could look as follows:
+
+~~~
+# The backend that will be used to store operator state checkpoints
+
+state.backend: filesystem
+
+
+# Directory for storing checkpoints
+
+state.backend.fs.checkpointdir: hdfs://namenode:40010/flink/checkpoints
+~~~

Reply via email to