http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/dev/batch/iterations.md
----------------------------------------------------------------------
diff --git a/docs/dev/batch/iterations.md b/docs/dev/batch/iterations.md
new file mode 100644
index 0000000..47910d0
--- /dev/null
+++ b/docs/dev/batch/iterations.md
@@ -0,0 +1,212 @@
+---
+title:  "Iterations"
+
+# Sub-level navigation
+sub-nav-group: batch
+sub-nav-pos: 3
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+Iterative algorithms occur in many domains of data analysis, such as *machine 
learning* or *graph analysis*. Such algorithms are crucial in order to realize 
the promise of Big Data to extract meaningful information out of your data. 
With increasing interest to run these kinds of algorithms on very large data 
sets, there is a need to execute iterations in a massively parallel fashion.
+
+Flink programs implement iterative algorithms by defining a **step function** 
and embedding it into a special iteration operator. There are two  variants of 
this operator: **Iterate** and **Delta Iterate**. Both operators repeatedly 
invoke the step function on the current iteration state until a certain 
termination condition is reached.
+
+Here, we provide background on both operator variants and outline their usage. 
The [programming guide](index.html) explains how to implement the operators in 
both Scala and Java. We also support both **vertex-centric and gather-sum-apply 
iterations** through Flink's graph processing API, 
[Gelly]({{site.baseurl}}/dev/libs/gelly/index.html).
+
+The following table provides an overview of both operators:
+
+<table class="table table-striped table-hover table-bordered">
+       <thead>
+               <th></th>
+               <th class="text-center">Iterate</th>
+               <th class="text-center">Delta Iterate</th>
+       </thead>
+       <tr>
+               <td class="text-center" width="20%"><strong>Iteration 
Input</strong></td>
+               <td class="text-center" width="40%"><strong>Partial 
Solution</strong></td>
+               <td class="text-center" width="40%"><strong>Workset</strong> 
and <strong>Solution Set</strong></td>
+       </tr>
+       <tr>
+               <td class="text-center"><strong>Step Function</strong></td>
+               <td colspan="2" class="text-center">Arbitrary Data Flows</td>
+       </tr>
+       <tr>
+               <td class="text-center"><strong>State Update</strong></td>
+               <td class="text-center">Next <strong>partial 
solution</strong></td>
+               <td>
+                       <ul>
+                               <li>Next workset</li>
+                               <li><strong>Changes to solution 
set</strong></li>
+                       </ul>
+               </td>
+       </tr>
+       <tr>
+               <td class="text-center"><strong>Iteration Result</strong></td>
+               <td class="text-center">Last partial solution</td>
+               <td class="text-center">Solution set state after last 
iteration</td>
+       </tr>
+       <tr>
+               <td class="text-center"><strong>Termination</strong></td>
+               <td>
+                       <ul>
+                               <li><strong>Maximum number of 
iterations</strong> (default)</li>
+                               <li>Custom aggregator convergence</li>
+                       </ul>
+               </td>
+               <td>
+                       <ul>
+                               <li><strong>Maximum number of iterations or 
empty workset</strong> (default)</li>
+                               <li>Custom aggregator convergence</li>
+                       </ul>
+               </td>
+       </tr>
+</table>
+
+
+* This will be replaced by the TOC
+{:toc}
+
+Iterate Operator
+----------------
+
+The **iterate operator** covers the *simple form of iterations*: in each 
iteration, the **step function** consumes the **entire input** (the *result of 
the previous iteration*, or the *initial data set*), and computes the **next 
version of the partial solution** (e.g. `map`, `reduce`, `join`, etc.).
+
+<p class="text-center">
+    <img alt="Iterate Operator" width="60%" 
src="fig/iterations_iterate_operator.png" />
+</p>
+
+  1. **Iteration Input**: Initial input for the *first iteration* from a *data 
source* or *previous operators*.
+  2. **Step Function**: The step function will be executed in each iteration. 
It is an arbitrary data flow consisting of operators like `map`, `reduce`, 
`join`, etc. and depends on your specific task at hand.
+  3. **Next Partial Solution**: In each iteration, the output of the step 
function will be fed back into the *next iteration*.
+  4. **Iteration Result**: Output of the *last iteration* is written to a 
*data sink* or used as input to the *following operators*.
+
+There are multiple options to specify **termination conditions** for an 
iteration:
+
+  - **Maximum number of iterations**: Without any further conditions, the 
iteration will be executed this many times.
+  - **Custom aggregator convergence**: Iterations allow to specify *custom 
aggregators* and *convergence criteria* like sum aggregate the number of 
emitted records (aggregator) and terminate if this number is zero (convergence 
criterion).
+
+You can also think about the iterate operator in pseudo-code:
+
+~~~java
+IterationState state = getInitialState();
+
+while (!terminationCriterion()) {
+       state = step(state);
+}
+
+setFinalState(state);
+~~~
+
+<div class="panel panel-default">
+       <div class="panel-body">
+       See the <strong><a href="index.html">Programming Guide</a> </strong> 
for details and code examples.</div>
+</div>
+
+### Example: Incrementing Numbers
+
+In the following example, we **iteratively incremenet a set numbers**:
+
+<p class="text-center">
+    <img alt="Iterate Operator Example" width="60%" 
src="fig/iterations_iterate_operator_example.png" />
+</p>
+
+  1. **Iteration Input**: The inital input is read from a data source and 
consists of five single-field records (integers `1` to `5`).
+  2. **Step function**: The step function is a single `map` operator, which 
increments the integer field from `i` to `i+1`. It will be applied to every 
record of the input.
+  3. **Next Partial Solution**: The output of the step function will be the 
output of the map operator, i.e. records with incremented integers.
+  4. **Iteration Result**: After ten iterations, the initial numbers will have 
been incremented ten times, resulting in integers `11` to `15`.
+
+~~~
+// 1st           2nd                       10th
+map(1) -> 2      map(2) -> 3      ...      map(10) -> 11
+map(2) -> 3      map(3) -> 4      ...      map(11) -> 12
+map(3) -> 4      map(4) -> 5      ...      map(12) -> 13
+map(4) -> 5      map(5) -> 6      ...      map(13) -> 14
+map(5) -> 6      map(6) -> 7      ...      map(14) -> 15
+~~~
+
+Note that **1**, **2**, and **4** can be arbitrary data flows.
+
+
+Delta Iterate Operator
+----------------------
+
+The **delta iterate operator** covers the case of **incremental iterations**. 
Incremental iterations **selectively modify elements** of their **solution** 
and evolve the solution rather than fully recompute it.
+
+Where applicable, this leads to **more efficient algorithms**, because not 
every element in the solution set changes in each iteration. This allows to 
**focus on the hot parts** of the solution and leave the **cold parts 
untouched**. Frequently, the majority of the solution cools down comparatively 
fast and the later iterations operate only on a small subset of the data.
+
+<p class="text-center">
+    <img alt="Delta Iterate Operator" width="60%" 
src="fig/iterations_delta_iterate_operator.png" />
+</p>
+
+  1. **Iteration Input**: The initial workset and solution set are read from 
*data sources* or *previous operators* as input to the first iteration.
+  2. **Step Function**: The step function will be executed in each iteration. 
It is an arbitrary data flow consisting of operators like `map`, `reduce`, 
`join`, etc. and depends on your specific task at hand.
+  3. **Next Workset/Update Solution Set**: The *next workset* drives the 
iterative computation and will be fed back into the *next iteration*. 
Furthermore, the solution set will be updated and implicitly forwarded (it is 
not required to be rebuild). Both data sets can be updated by different 
operators of the step function.
+  4. **Iteration Result**: After the *last iteration*, the *solution set* is 
written to a *data sink* or used as input to the *following operators*.
+
+The default **termination condition** for delta iterations is specified by the 
**empty workset convergence criterion** and a **maximum number of iterations**. 
The iteration will terminate when a produced *next workset* is empty or when 
the maximum number of iterations is reached. It is also possible to specify a 
**custom aggregator** and **convergence criterion**.
+
+You can also think about the iterate operator in pseudo-code:
+
+~~~java
+IterationState workset = getInitialState();
+IterationState solution = getInitialSolution();
+
+while (!terminationCriterion()) {
+       (delta, workset) = step(workset, solution);
+
+       solution.update(delta)
+}
+
+setFinalState(solution);
+~~~
+
+<div class="panel panel-default">
+       <div class="panel-body">
+       See the <strong><a href="index.html">programming guide</a></strong> for 
details and code examples.</div>
+</div>
+
+### Example: Propagate Minimum in Graph
+
+In the following example, every vertex has an **ID** and a **coloring**. Each 
vertex will propagate its vertex ID to neighboring vertices. The **goal** is to 
*assign the minimum ID to every vertex in a subgraph*. If a received ID is 
smaller then the current one, it changes to the color of the vertex with the 
received ID. One application of this can be found in *community analysis* or 
*connected components* computation.
+
+<p class="text-center">
+    <img alt="Delta Iterate Operator Example" width="100%" 
src="fig/iterations_delta_iterate_operator_example.png" />
+</p>
+
+The **initial input** is set as **both workset and solution set.** In the 
above figure, the colors visualize the **evolution of the solution set**. With 
each iteration, the color of the minimum ID is spreading in the respective 
subgraph. At the same time, the amount of work (exchanged and compared vertex 
IDs) decreases with each iteration. This corresponds to the **decreasing size 
of the workset**, which goes from all seven vertices to zero after three 
iterations, at which time the iteration terminates. The **important 
observation** is that *the lower subgraph converges before the upper half* does 
and the delta iteration is able to capture this with the workset abstraction.
+
+In the upper subgraph **ID 1** (*orange*) is the **minimum ID**. In the 
**first iteration**, it will get propagated to vertex 2, which will 
subsequently change its color to orange. Vertices 3 and 4 will receive **ID 2** 
(in *yellow*) as their current minimum ID and change to yellow. Because the 
color of *vertex 1* didn't change in the first iteration, it can be skipped it 
in the next workset.
+
+In the lower subgraph **ID 5** (*cyan*) is the **minimum ID**. All vertices of 
the lower subgraph will receive it in the first iteration. Again, we can skip 
the unchanged vertices (*vertex 5*) for the next workset.
+
+In the **2nd iteration**, the workset size has already decreased from seven to 
five elements (vertices 2, 3, 4, 6, and 7). These are part of the iteration and 
further propagate their current minimum IDs. After this iteration, the lower 
subgraph has already converged (**cold part** of the graph), as it has no 
elements in the workset, whereas the upper half needs a further iteration 
(**hot part** of the graph) for the two remaining workset elements (vertices 3 
and 4).
+
+The iteration **terminates**, when the workset is empty after the **3rd 
iteration**.
+
+<a href="#supersteps"></a>
+
+Superstep Synchronization
+-------------------------
+
+We referred to each execution of the step function of an iteration operator as 
*a single iteration*. In parallel setups, **multiple instances of the step 
function are evaluated in parallel** on different partitions of the iteration 
state. In many settings, one evaluation of the step function on all parallel 
instances forms a so called **superstep**, which is also the granularity of 
synchronization. Therefore, *all* parallel tasks of an iteration need to 
complete the superstep, before a next superstep will be initialized. 
**Termination criteria** will also be evaluated at superstep barriers.
+
+<p class="text-center">
+    <img alt="Supersteps" width="50%" src="fig/iterations_supersteps.png" />
+</p>

http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/dev/batch/python.md
----------------------------------------------------------------------
diff --git a/docs/dev/batch/python.md b/docs/dev/batch/python.md
new file mode 100644
index 0000000..09a4fa8
--- /dev/null
+++ b/docs/dev/batch/python.md
@@ -0,0 +1,635 @@
+---
+title: "Python Programming Guide"
+is_beta: true
+nav-title: Python API
+nav-parent_id: batch
+nav-pos: 4
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+Analysis programs in Flink are regular programs that implement transformations 
on data sets
+(e.g., filtering, mapping, joining, grouping). The data sets are initially 
created from certain
+sources (e.g., by reading files, or from collections). Results are returned 
via sinks, which may for
+example write the data to (distributed) files, or to standard output (for 
example the command line
+terminal). Flink programs run in a variety of contexts, standalone, or 
embedded in other programs.
+The execution can happen in a local JVM, or on clusters of many machines.
+
+In order to create your own Flink program, we encourage you to start with the
+[program skeleton](#program-skeleton) and gradually add your own
+[transformations](#transformations). The remaining sections act as references 
for additional
+operations and advanced features.
+
+* This will be replaced by the TOC
+{:toc}
+
+Example Program
+---------------
+
+The following program is a complete, working example of WordCount. You can 
copy &amp; paste the code
+to run it locally.
+
+{% highlight python %}
+from flink.plan.Environment import get_environment
+from flink.functions.GroupReduceFunction import GroupReduceFunction
+
+class Adder(GroupReduceFunction):
+  def reduce(self, iterator, collector):
+    count, word = iterator.next()
+    count += sum([x[0] for x in iterator])
+    collector.collect((count, word))
+
+env = get_environment()
+data = env.from_elements("Who's there?",
+ "I think I hear them. Stand, ho! Who's there?")
+
+data \
+  .flat_map(lambda x, c: [(1, word) for word in x.lower().split()]) \
+  .group_by(1) \
+  .reduce_group(Adder(), combinable=True) \
+  .output()
+
+env.execute(local=True)
+{% endhighlight %}
+
+{% top %}
+
+Program Skeleton
+----------------
+
+As we already saw in the example, Flink programs look like regular python 
programs.
+Each program consists of the same basic parts:
+
+1. Obtain an `Environment`,
+2. Load/create the initial data,
+3. Specify transformations on this data,
+4. Specify where to put the results of your computations, and
+5. Execute your program.
+
+We will now give an overview of each of those steps but please refer to the 
respective sections for
+more details.
+
+
+The `Environment` is the basis for all Flink programs. You can
+obtain one using these static methods on class `Environment`:
+
+{% highlight python %}
+get_environment()
+{% endhighlight %}
+
+For specifying data sources the execution environment has several methods
+to read from files. To just read a text file as a sequence of lines, you can 
use:
+
+{% highlight python %}
+env = get_environment()
+text = env.read_text("file:///path/to/file")
+{% endhighlight %}
+
+This will give you a DataSet on which you can then apply transformations. For
+more information on data sources and input formats, please refer to
+[Data Sources](#data-sources).
+
+Once you have a DataSet you can apply transformations to create a new
+DataSet which you can then write to a file, transform again, or
+combine with other DataSets. You apply transformations by calling
+methods on DataSet with your own custom transformation function. For example,
+a map transformation looks like this:
+
+{% highlight python %}
+data.map(lambda x: x*2)
+{% endhighlight %}
+
+This will create a new DataSet by doubling every value in the original DataSet.
+For more information and a list of all the transformations,
+please refer to [Transformations](#transformations).
+
+Once you have a DataSet that needs to be written to disk you can call one
+of these methods on DataSet:
+
+{% highlight python %}
+data.write_text("<file-path>", WriteMode=Constants.NO_OVERWRITE)
+write_csv("<file-path>", line_delimiter='\n', field_delimiter=',', 
write_mode=Constants.NO_OVERWRITE)
+output()
+{% endhighlight %}
+
+The last method is only useful for developing/debugging on a local machine,
+it will output the contents of the DataSet to standard output. (Note that in
+a cluster, the result goes to the standard out stream of the cluster nodes and 
ends
+up in the *.out* files of the workers).
+The first two do as the name suggests.
+Please refer to [Data Sinks](#data-sinks) for more information on writing to 
files.
+
+Once you specified the complete program you need to call `execute` on
+the `Environment`. This will either execute on your local machine or submit 
your program
+for execution on a cluster, depending on how Flink was started. You can force
+a local execution by using `execute(local=True)`.
+
+{% top %}
+
+Project setup
+---------------
+
+Apart from setting up Flink, no additional work is required. The python 
package can be found in the /resource folder of your Flink distribution. The 
flink package, along with the plan and optional packages are automatically 
distributed among the cluster via HDFS when running a job.
+
+The Python API was tested on Linux/Windows systems that have Python 2.7 or 3.4 
installed.
+
+By default Flink will start python processes by calling "python" or "python3", 
depending on which start-script
+was used. By setting the "python.binary.python[2/3]" key in the 
flink-conf.yaml you can modify this behaviour to use a binary of your choice.
+
+{% top %}
+
+Lazy Evaluation
+---------------
+
+All Flink programs are executed lazily: When the program's main method is 
executed, the data loading
+and transformations do not happen directly. Rather, each operation is created 
and added to the
+program's plan. The operations are actually executed when one of the 
`execute()` methods is invoked
+on the Environment object. Whether the program is executed locally or on a 
cluster depends
+on the environment of the program.
+
+The lazy evaluation lets you construct sophisticated programs that Flink 
executes as one
+holistically planned unit.
+
+{% top %}
+
+
+Transformations
+---------------
+
+Data transformations transform one or more DataSets into a new DataSet. 
Programs can combine
+multiple transformations into sophisticated assemblies.
+
+This section gives a brief overview of the available transformations. The 
[transformations
+documentation](dataset_transformations.html) has a full description of all 
transformations with
+examples.
+
+<br />
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 20%">Transformation</th>
+      <th class="text-center">Description</th>
+    </tr>
+  </thead>
+
+  <tbody>
+    <tr>
+      <td><strong>Map</strong></td>
+      <td>
+        <p>Takes one element and produces one element.</p>
+{% highlight python %}
+data.map(lambda x: x * 2)
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td><strong>FlatMap</strong></td>
+      <td>
+        <p>Takes one element and produces zero, one, or more elements. </p>
+{% highlight python %}
+data.flat_map(
+  lambda x,c: [(1,word) for word in line.lower().split() for line in x])
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td><strong>MapPartition</strong></td>
+      <td>
+        <p>Transforms a parallel partition in a single function call. The 
function get the partition
+        as an `Iterator` and can produce an arbitrary number of result values. 
The number of
+        elements in each partition depends on the parallelism and previous 
operations.</p>
+{% highlight python %}
+data.map_partition(lambda x,c: [value * 2 for value in x])
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td><strong>Filter</strong></td>
+      <td>
+        <p>Evaluates a boolean function for each element and retains those for 
which the function
+        returns true.</p>
+{% highlight python %}
+data.filter(lambda x: x > 1000)
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td><strong>Reduce</strong></td>
+      <td>
+        <p>Combines a group of elements into a single element by repeatedly 
combining two elements
+        into one. Reduce may be applied on a full data set, or on a grouped 
data set.</p>
+{% highlight python %}
+data.reduce(lambda x,y : x + y)
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td><strong>ReduceGroup</strong></td>
+      <td>
+        <p>Combines a group of elements into one or more elements. ReduceGroup 
may be applied on a
+        full data set, or on a grouped data set.</p>
+{% highlight python %}
+class Adder(GroupReduceFunction):
+  def reduce(self, iterator, collector):
+    count, word = iterator.next()
+    count += sum([x[0] for x in iterator)      
+    collector.collect((count, word))
+
+data.reduce_group(Adder())
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td><strong>Aggregate</strong></td>
+      <td>
+        <p>Performs a built-in operation (sum, min, max) on one field of all 
the Tuples in a
+        data set or in each group of a data set. Aggregation can be applied on 
a full dataset
+        or on a grouped data set.</p>
+{% highlight python %}
+# This code finds the sum of all of the values in the first field and the 
maximum of all of the values in the second field
+data.aggregate(Aggregation.Sum, 0).and_agg(Aggregation.Max, 1)
+
+# min(), max(), and sum() syntactic sugar functions are also available
+data.sum(0).and_agg(Aggregation.Max, 1)
+{% endhighlight %}
+      </td>
+    </tr>
+
+    </tr>
+      <td><strong>Join</strong></td>
+      <td>
+        Joins two data sets by creating all pairs of elements that are equal 
on their keys.
+        Optionally uses a JoinFunction to turn the pair of elements into a 
single element.
+        See <a href="#specifying-keys">keys</a> on how to define join keys.
+{% highlight python %}
+# In this case tuple fields are used as keys.
+# "0" is the join field on the first tuple
+# "1" is the join field on the second tuple.
+result = input1.join(input2).where(0).equal_to(1)
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td><strong>CoGroup</strong></td>
+      <td>
+        <p>The two-dimensional variant of the reduce operation. Groups each 
input on one or more
+        fields and then joins the groups. The transformation function is 
called per pair of groups.
+        See <a href="#specifying-keys">keys</a> on how to define coGroup 
keys.</p>
+{% highlight python %}
+data1.co_group(data2).where(0).equal_to(1)
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td><strong>Cross</strong></td>
+      <td>
+        <p>Builds the Cartesian product (cross product) of two inputs, 
creating all pairs of
+        elements. Optionally uses a CrossFunction to turn the pair of elements 
into a single
+        element.</p>
+{% highlight python %}
+result = data1.cross(data2)
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td><strong>Union</strong></td>
+      <td>
+        <p>Produces the union of two data sets.</p>
+{% highlight python %}
+data.union(data2)
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td><strong>ZipWithIndex</strong></td>
+      <td>
+        <p>Assigns consecutive indexes to each element. For more information, 
please refer to
+        the [Zip Elements 
Guide](zip_elements_guide.html#zip-with-a-dense-index).</p>
+{% highlight python %}
+data.zip_with_index()
+{% endhighlight %}
+      </td>
+    </tr>
+  </tbody>
+</table>
+
+{% top %}
+
+
+Specifying Keys
+-------------
+
+Some transformations (like Join or CoGroup) require that a key is defined on
+its argument DataSets, and other transformations (Reduce, GroupReduce) allow 
that the DataSet is grouped on a key before they are
+applied.
+
+A DataSet is grouped as
+{% highlight python %}
+reduced = data \
+  .group_by(<define key here>) \
+  .reduce_group(<do something>)
+{% endhighlight %}
+
+The data model of Flink is not based on key-value pairs. Therefore,
+you do not need to physically pack the data set types into keys and
+values. Keys are "virtual": they are defined as functions over the
+actual data to guide the grouping operator.
+
+### Define keys for Tuples
+{:.no_toc}
+
+The simplest case is grouping a data set of Tuples on one or more
+fields of the Tuple:
+{% highlight python %}
+reduced = data \
+  .group_by(0) \
+  .reduce_group(<do something>)
+{% endhighlight %}
+
+The data set is grouped on the first field of the tuples.
+The group-reduce function will thus receive groups of tuples with
+the same value in the first field.
+
+{% highlight python %}
+grouped = data \
+  .group_by(0,1) \
+  .reduce(/*do something*/)
+{% endhighlight %}
+
+The data set is grouped on the composite key consisting of the first and the
+second fields, therefore the reduce function will receive groups
+with the same value for both fields.
+
+A note on nested Tuples: If you have a DataSet with a nested tuple
+specifying `group_by(<index of tuple>)` will cause the system to use the full 
tuple as a key.
+
+{% top %}
+
+
+Passing Functions to Flink
+--------------------------
+
+Certain operations require user-defined functions, whereas all of them accept 
lambda functions and rich functions as arguments.
+
+{% highlight python %}
+data.filter(lambda x: x > 5)
+{% endhighlight %}
+
+{% highlight python %}
+class Filter(FilterFunction):
+    def filter(self, value):
+        return value > 5
+
+data.filter(Filter())
+{% endhighlight %}
+
+Rich functions allow the use of imported functions, provide access to 
broadcast-variables,
+can be parameterized using __init__(), and are the go-to-option for complex 
functions.
+They are also the only way to define an optional `combine` function for a 
reduce operation.
+
+Lambda functions allow the easy insertion of one-liners. Note that a lambda 
function has to return
+an iterable, if the operation can return multiple values. (All functions 
receiving a collector argument)
+
+{% top %}
+
+Data Types
+----------
+
+Flink's Python API currently only offers native support for primitive python 
types (int, float, bool, string) and byte arrays.
+
+The type support can be extended by passing a serializer, deserializer and 
type class to the environment.
+{% highlight python %}
+class MyObj(object):
+    def __init__(self, i):
+        self.value = i
+
+
+class MySerializer(object):
+    def serialize(self, value):
+        return struct.pack(">i", value.value)
+
+
+class MyDeserializer(object):
+    def _deserialize(self, read):
+        i = struct.unpack(">i", read(4))[0]
+        return MyObj(i)
+
+
+env.register_custom_type(MyObj, MySerializer(), MyDeserializer())
+{% endhighlight %}
+
+#### Tuples/Lists
+
+You can use the tuples (or lists) for composite types. Python tuples are 
mapped to the Flink Tuple type, that contain
+a fix number of fields of various types (up to 25). Every field of a tuple can 
be a primitive type - including further tuples, resulting in nested tuples.
+
+{% highlight python %}
+word_counts = env.from_elements(("hello", 1), ("world",2))
+
+counts = word_counts.map(lambda x: x[1])
+{% endhighlight %}
+
+When working with operators that require a Key for grouping or matching 
records,
+Tuples let you simply specify the positions of the fields to be used as key. 
You can specify more
+than one position to use composite keys (see [Section Data 
Transformations](#transformations)).
+
+{% highlight python %}
+wordCounts \
+    .group_by(0) \
+    .reduce(MyReduceFunction())
+{% endhighlight %}
+
+{% top %}
+
+Data Sources
+------------
+
+Data sources create the initial data sets, such as from files or from 
collections.
+
+File-based:
+
+- `read_text(path)` - Reads files line wise and returns them as Strings.
+- `read_csv(path, type)` - Parses files of comma (or another char) delimited 
fields.
+  Returns a DataSet of tuples. Supports the basic java types and their Value 
counterparts as field
+  types.
+
+Collection-based:
+
+- `from_elements(*args)` - Creates a data set from a Seq. All elements
+- `generate_sequence(from, to)` - Generates the sequence of numbers in the 
given interval, in parallel.
+
+**Examples**
+
+{% highlight python %}
+env  = get_environment
+
+\# read text file from local files system
+localLiens = env.read_text("file:#/path/to/my/textfile")
+
+\# read text file from a HDFS running at nnHost:nnPort
+hdfsLines = env.read_text("hdfs://nnHost:nnPort/path/to/my/textfile")
+
+\# read a CSV file with three fields, schema defined using constants defined 
in flink.plan.Constants
+csvInput = env.read_csv("hdfs:///the/CSV/file", (INT, STRING, DOUBLE))
+
+\# create a set from some given elements
+values = env.from_elements("Foo", "bar", "foobar", "fubar")
+
+\# generate a number sequence
+numbers = env.generate_sequence(1, 10000000)
+{% endhighlight %}
+
+{% top %}
+
+Data Sinks
+----------
+
+Data sinks consume DataSets and are used to store or return them:
+
+- `write_text()` - Writes elements line-wise as Strings. The Strings are
+  obtained by calling the *str()* method of each element.
+- `write_csv(...)` - Writes tuples as comma-separated value files. Row and 
field
+  delimiters are configurable. The value for each field comes from the *str()* 
method of the objects.
+- `output()` - Prints the *str()* value of each element on the
+  standard out.
+
+A DataSet can be input to multiple operations. Programs can write or print a 
data set and at the
+same time run additional transformations on them.
+
+**Examples**
+
+Standard data sink methods:
+
+{% highlight scala %}
+ write DataSet to a file on the local file system
+textData.write_text("file:///my/result/on/localFS")
+
+ write DataSet to a file on a HDFS with a namenode running at nnHost:nnPort
+textData.write_text("hdfs://nnHost:nnPort/my/result/on/localFS")
+
+ write DataSet to a file and overwrite the file if it exists
+textData.write_text("file:///my/result/on/localFS", WriteMode.OVERWRITE)
+
+ tuples as lines with pipe as the separator "a|b|c"
+values.write_csv("file:///path/to/the/result/file", line_delimiter="\n", 
field_delimiter="|")
+
+ this writes tuples in the text formatting "(a, b, c)", rather than as CSV 
lines
+values.write_text("file:///path/to/the/result/file")
+{% endhighlight %}
+
+{% top %}
+
+Broadcast Variables
+-------------------
+
+Broadcast variables allow you to make a data set available to all parallel 
instances of an
+operation, in addition to the regular input of the operation. This is useful 
for auxiliary data
+sets, or data-dependent parameterization. The data set will then be accessible 
at the operator as a
+Collection.
+
+- **Broadcast**: broadcast sets are registered by name via 
`with_broadcast_set(DataSet, String)`
+- **Access**: accessible via `self.context.get_broadcast_variable(String)` at 
the target operator
+
+{% highlight python %}
+class MapperBcv(MapFunction):
+    def map(self, value):
+        factor = self.context.get_broadcast_variable("bcv")[0][0]
+        return value * factor
+
+# 1. The DataSet to be broadcasted
+toBroadcast = env.from_elements(1, 2, 3)
+data = env.from_elements("a", "b")
+
+# 2. Broadcast the DataSet
+data.map(MapperBcv()).with_broadcast_set("bcv", toBroadcast)
+{% endhighlight %}
+
+Make sure that the names (`bcv` in the previous example) match when 
registering and
+accessing broadcasted data sets.
+
+**Note**: As the content of broadcast variables is kept in-memory on each 
node, it should not become
+too large. For simpler things like scalar values you can simply parameterize 
the rich function.
+
+{% top %}
+
+Parallel Execution
+------------------
+
+This section describes how the parallel execution of programs can be 
configured in Flink. A Flink
+program consists of multiple tasks (operators, data sources, and sinks). A 
task is split into
+several parallel instances for execution and each parallel instance processes 
a subset of the task's
+input data. The number of parallel instances of a task is called its 
*parallelism* or *degree of
+parallelism (DOP)*.
+
+The degree of parallelism of a task can be specified in Flink on different 
levels.
+
+### Execution Environment Level
+
+Flink programs are executed in the context of an [execution 
environment](#program-skeleton). An
+execution environment defines a default parallelism for all operators, data 
sources, and data sinks
+it executes. Execution environment parallelism can be overwritten by 
explicitly configuring the
+parallelism of an operator.
+
+The default parallelism of an execution environment can be specified by 
calling the
+`set_parallelism()` method. To execute all operators, data sources, and data 
sinks of the
+[WordCount](#example-program) example program with a parallelism of `3`, set 
the default parallelism of the
+execution environment as follows:
+
+{% highlight python %}
+env = get_environment()
+env.set_parallelism(3)
+
+text.flat_map(lambda x,c: x.lower().split()) \
+    .group_by(1) \
+    .reduce_group(Adder(), combinable=True) \
+    .output()
+
+env.execute()
+{% endhighlight %}
+
+### System Level
+
+A system-wide default parallelism for all execution environments can be 
defined by setting the
+`parallelism.default` property in `./conf/flink-conf.yaml`. See the
+[Configuration]({{ site.baseurl }}/setup/config.html) documentation for 
details.
+
+{% top %}
+
+Executing Plans
+---------------
+
+To run the plan with Flink, go to your Flink distribution, and run the 
pyflink.sh script from the /bin folder.
+use pyflink2.sh for python 2.7, and pyflink3.sh for python 3.4. The script 
containing the plan has to be passed
+as the first argument, followed by a number of additional python packages, and 
finally, separated by - additional
+arguments that will be fed to the script.
+
+{% highlight python %}
+./bin/pyflink<2/3>.sh <Script>[ <pathToPackage1>[ <pathToPackageX]][ - 
<param1>[ <paramX>]]
+{% endhighlight %}
+
+{% top %}

http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/dev/batch/zip_elements_guide.md
----------------------------------------------------------------------
diff --git a/docs/dev/batch/zip_elements_guide.md 
b/docs/dev/batch/zip_elements_guide.md
new file mode 100644
index 0000000..40edb8d
--- /dev/null
+++ b/docs/dev/batch/zip_elements_guide.md
@@ -0,0 +1,126 @@
+---
+title: "Zipping Elements in a DataSet"
+nav-title: Zipping Elements
+nav-parent_id: batch
+nav-pos: 2
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+In certain algorithms, one may need to assign unique identifiers to data set 
elements.
+This document shows how {% gh_link 
/flink-java/src/main/java/org/apache/flink/api/java/utils/DataSetUtils.java 
"DataSetUtils" %} can be used for that purpose.
+
+* This will be replaced by the TOC
+{:toc}
+
+### Zip with a Dense Index
+`zipWithIndex` assigns consecutive labels to the elements, receiving a data 
set as input and returning a new data set of `(unique id, initial value)` 
2-tuples.
+This process requires two passes, first counting then labeling elements, and 
cannot be pipelined due to the synchronization of counts.
+The alternative `zipWithUniqueId` works in a pipelined fashion and is 
preferred when a unique labeling is sufficient.
+For example, the following code:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+env.setParallelism(2);
+DataSet<String> in = env.fromElements("A", "B", "C", "D", "E", "F", "G", "H");
+
+DataSet<Tuple2<Long, String>> result = DataSetUtils.zipWithIndex(in);
+
+result.writeAsCsv(resultPath, "\n", ",");
+env.execute();
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+import org.apache.flink.api.scala._
+
+val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+env.setParallelism(2)
+val input: DataSet[String] = env.fromElements("A", "B", "C", "D", "E", "F", 
"G", "H")
+
+val result: DataSet[(Long, String)] = input.zipWithIndex
+
+result.writeAsCsv(resultPath, "\n", ",")
+env.execute()
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight python %}
+from flink.plan.Environment import get_environment
+
+env = get_environment()
+env.set_parallelism(2)
+input = env.from_elements("A", "B", "C", "D", "E", "F", "G", "H")
+
+result = input.zipWithIndex()
+
+result.write_text(result_path)
+env.execute()
+{% endhighlight %}
+</div>
+
+</div>
+
+may yield the tuples: (0,G), (1,H), (2,A), (3,B), (4,C), (5,D), (6,E), (7,F)
+
+[Back to top](#top)
+
+### Zip with a Unique Identifier
+In many cases one may not need to assign consecutive labels.
+`zipWithUniqueId` works in a pipelined fashion, speeding up the label 
assignment process. This method receives a data set as input and returns a new 
data set of `(unique id, initial value)` 2-tuples.
+For example, the following code:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+env.setParallelism(2);
+DataSet<String> in = env.fromElements("A", "B", "C", "D", "E", "F", "G", "H");
+
+DataSet<Tuple2<Long, String>> result = DataSetUtils.zipWithUniqueId(in);
+
+result.writeAsCsv(resultPath, "\n", ",");
+env.execute();
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+import org.apache.flink.api.scala._
+
+val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+env.setParallelism(2)
+val input: DataSet[String] = env.fromElements("A", "B", "C", "D", "E", "F", 
"G", "H")
+
+val result: DataSet[(Long, String)] = input.zipWithUniqueId
+
+result.writeAsCsv(resultPath, "\n", ",")
+env.execute()
+{% endhighlight %}
+</div>
+
+</div>
+
+may yield the tuples: (0,G), (1,A), (2,H), (3,B), (5,C), (7,D), (9,E), (11,F)
+
+[Back to top](#top)

http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/dev/cluster_execution.md
----------------------------------------------------------------------
diff --git a/docs/dev/cluster_execution.md b/docs/dev/cluster_execution.md
new file mode 100644
index 0000000..31b4d4a
--- /dev/null
+++ b/docs/dev/cluster_execution.md
@@ -0,0 +1,155 @@
+---
+title:  "Cluster Execution"
+nav-parent_id: dev
+nav-pos: 12
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* This will be replaced by the TOC
+{:toc}
+
+Flink programs can run distributed on clusters of many machines. There
+are two ways to send a program to a cluster for execution:
+
+## Command Line Interface
+
+The command line interface lets you submit packaged programs (JARs) to a 
cluster
+(or single machine setup).
+
+Please refer to the [Command Line Interface]({{ site.baseurl 
}}/setup/cli.html) documentation for
+details.
+
+## Remote Environment
+
+The remote environment lets you execute Flink Java programs on a cluster
+directly. The remote environment points to the cluster on which you want to
+execute the program.
+
+### Maven Dependency
+
+If you are developing your program as a Maven project, you have to add the
+`flink-clients` module using this dependency:
+
+~~~xml
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-clients{{ site.scala_version_suffix }}</artifactId>
+  <version>{{ site.version }}</version>
+</dependency>
+~~~
+
+### Example
+
+The following illustrates the use of the `RemoteEnvironment`:
+
+~~~java
+public static void main(String[] args) throws Exception {
+    ExecutionEnvironment env = ExecutionEnvironment
+        .createRemoteEnvironment("flink-master", 6123, "/home/user/udfs.jar");
+
+    DataSet<String> data = env.readTextFile("hdfs://path/to/file");
+
+    data
+        .filter(new FilterFunction<String>() {
+            public boolean filter(String value) {
+                return value.startsWith("http://";);
+            }
+        })
+        .writeAsText("hdfs://path/to/result");
+
+    env.execute();
+}
+~~~
+
+Note that the program contains custom user code and hence requires a JAR file 
with
+the classes of the code attached. The constructor of the remote environment
+takes the path(s) to the JAR file(s).
+
+## Linking with modules not contained in the binary distribution
+
+The binary distribution contains jar packages in the `lib` folder that are 
automatically
+provided to the classpath of your distributed programs. Almost all of Flink 
classes are
+located there with a few exceptions, for example the streaming connectors and 
some freshly
+added modules. To run code depending on these modules you need to make them 
accessible
+during runtime, for which we suggest two options:
+
+1. Either copy the required jar files to the `lib` folder onto all of your 
TaskManagers.
+Note that you have to restart your TaskManagers after this.
+2. Or package them with your code.
+
+The latter version is recommended as it respects the classloader management in 
Flink.
+
+### Packaging dependencies with your usercode with Maven
+
+To provide these dependencies not included by Flink we suggest two options 
with Maven.
+
+1. The maven assembly plugin builds a so-called uber-jar (executable jar) 
containing all your dependencies.
+The assembly configuration is straight-forward, but the resulting jar might 
become bulky.
+See 
[maven-assembly-plugin](http://maven.apache.org/plugins/maven-assembly-plugin/usage.html)
 for further information.
+2. The maven unpack plugin unpacks the relevant parts of the dependencies and
+then packages it with your code.
+
+Using the latter approach in order to bundle the Kafka connector, 
`flink-connector-kafka`
+you would need to add the classes from both the connector and the Kafka API 
itself. Add
+the following to your plugins section.
+
+~~~xml
+<plugin>
+    <groupId>org.apache.maven.plugins</groupId>
+    <artifactId>maven-dependency-plugin</artifactId>
+    <version>2.9</version>
+    <executions>
+        <execution>
+            <id>unpack</id>
+            <!-- executed just before the package phase -->
+            <phase>prepare-package</phase>
+            <goals>
+                <goal>unpack</goal>
+            </goals>
+            <configuration>
+                <artifactItems>
+                    <!-- For Flink connector classes -->
+                    <artifactItem>
+                        <groupId>org.apache.flink</groupId>
+                        <artifactId>flink-connector-kafka</artifactId>
+                        <version>{{ site.version }}</version>
+                        <type>jar</type>
+                        <overWrite>false</overWrite>
+                        
<outputDirectory>${project.build.directory}/classes</outputDirectory>
+                        <includes>org/apache/flink/**</includes>
+                    </artifactItem>
+                    <!-- For Kafka API classes -->
+                    <artifactItem>
+                        <groupId>org.apache.kafka</groupId>
+                        <artifactId>kafka_<YOUR_SCALA_VERSION></artifactId>
+                        <version><YOUR_KAFKA_VERSION></version>
+                        <type>jar</type>
+                        <overWrite>false</overWrite>
+                        
<outputDirectory>${project.build.directory}/classes</outputDirectory>
+                        <includes>kafka/**</includes>
+                    </artifactItem>
+                </artifactItems>
+            </configuration>
+        </execution>
+    </executions>
+</plugin>
+~~~
+
+Now when running `mvn clean package` the produced jar includes the required 
dependencies.

http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/dev/connectors/cassandra.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/cassandra.md b/docs/dev/connectors/cassandra.md
new file mode 100644
index 0000000..90be0e3
--- /dev/null
+++ b/docs/dev/connectors/cassandra.md
@@ -0,0 +1,155 @@
+---
+title: "Apache Cassandra Connector"
+nav-title: Cassandra
+nav-parent_id: connectors
+nav-pos: 2
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+This connector provides sinks that writes data into a 
[Cassandra](https://cassandra.apache.org/) database.
+
+To use this connector, add the following dependency to your project:
+
+{% highlight xml %}
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-connector-cassandra{{ site.scala_version_suffix 
}}</artifactId>
+  <version>{{site.version }}</version>
+</dependency>
+{% endhighlight %}
+
+Note that the streaming connectors are currently not part of the binary 
distribution. See how to link with them for cluster execution [here]({{ 
site.baseurl}}/dev/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution).
+
+#### Installing Apache Cassandra
+Follow the instructions from the [Cassandra Getting Started 
page](http://wiki.apache.org/cassandra/GettingStarted).
+
+#### Cassandra Sink
+
+Flink's Cassandra sink are created by using the static 
CassandraSink.addSink(DataStream<IN> input) method.
+This method returns a CassandraSinkBuilder, which offers methods to further 
configure the sink.
+
+The following configuration methods can be used:
+
+1. setQuery(String query)
+2. setHost(String host[, int port])
+3. setClusterBuilder(ClusterBuilder builder)
+4. enableWriteAheadLog([CheckpointCommitter committer])
+5. build()
+
+*setQuery()* sets the query that is executed for every value the sink receives.
+*setHost()* sets the cassandra host/port to connect to. This method is 
intended for simple use-cases.
+*setClusterBuilder()* sets the cluster builder that is used to configure the 
connection to cassandra. The *setHost()* functionality can be subsumed with 
this method.
+*enableWriteAheadLog()* is an optional method, that allows exactly-once 
processing for non-deterministic algorithms.
+
+A checkpoint committer stores additional information about completed 
checkpoints
+in some resource. This information is used to prevent a full replay of the last
+completed checkpoint in case of a failure.
+You can use a `CassandraCommitter` to store these in a separate table in 
cassandra.
+Note that this table will NOT be cleaned up by Flink.
+
+*build()* finalizes the configuration and returns the CassandraSink.
+
+Flink can provide exactly-once guarantees if the query is idempotent (meaning 
it can be applied multiple
+times without changing the result) and checkpointing is enabled. In case of a 
failure the failed
+checkpoint will be replayed completely.
+
+Furthermore, for non-deterministic programs the write-ahead log has to be 
enabled. For such a program
+the replayed checkpoint may be completely different than the previous attempt, 
which may leave the
+database in an inconsitent state since part of the first attempt may already 
be written.
+The write-ahead log guarantees that the replayed checkpoint is identical to 
the first attempt.
+Note that that enabling this feature will have an adverse impact on latency.
+
+<p style="border-radius: 5px; padding: 5px" class="bg-danger"><b>Note</b>: The 
write-ahead log functionality is currently experimental. In many cases it is 
sufficent to use the connector without enabling it. Please report problems to 
the development mailing list.</p>
+
+
+#### Example
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+CassandraSink.addSink(input)
+  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
+  .setClusterBuilder(new ClusterBuilder() {
+    @Override
+    public Cluster buildCluster(Cluster.Builder builder) {
+      return builder.addContactPoint("127.0.0.1").build();
+    }
+  })
+  .build();
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+CassandraSink.addSink(input)
+  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
+  .setClusterBuilder(new ClusterBuilder() {
+    @Override
+    public Cluster buildCluster(Cluster.Builder builder) {
+      return builder.addContactPoint("127.0.0.1").build();
+    }
+  })
+  .build();
+{% endhighlight %}
+</div>
+</div>
+
+The Cassandra sinks support both tuples and POJO's that use DataStax 
annotations.
+Flink automatically detects which type of input is used.
+
+Example for such a Pojo:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+
+@Table(keyspace= "test", name = "mappersink")
+public class Pojo implements Serializable {
+
+       private static final long serialVersionUID = 1038054554690916991L;
+
+       @Column(name = "id")
+       private long id;
+       @Column(name = "value")
+       private String value;
+
+       public Pojo(long id, String value){
+               this.id = id;
+               this.value = value;
+       }
+
+       public long getId() {
+               return id;
+       }
+
+       public void setId(long id) {
+               this.id = id;
+       }
+
+       public String getValue() {
+               return value;
+       }
+
+       public void setValue(String value) {
+               this.value = value;
+       }
+}
+{% endhighlight %}
+</div>
+</div>

http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/dev/connectors/elasticsearch.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/elasticsearch.md 
b/docs/dev/connectors/elasticsearch.md
new file mode 100644
index 0000000..be45f98
--- /dev/null
+++ b/docs/dev/connectors/elasticsearch.md
@@ -0,0 +1,180 @@
+---
+title: "Elasticsearch Connector"
+nav-title: Elasticsearch
+nav-parent_id: connectors
+nav-pos: 4
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+This connector provides a Sink that can write to an
+[Elasticsearch](https://elastic.co/) Index. To use this connector, add the
+following dependency to your project:
+
+{% highlight xml %}
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-connector-elasticsearch{{ site.scala_version_suffix 
}}</artifactId>
+  <version>{{site.version }}</version>
+</dependency>
+{% endhighlight %}
+
+Note that the streaming connectors are currently not part of the binary
+distribution. See
+[here]({{site.baseurl}}/dev/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution)
+for information about how to package the program with the libraries for
+cluster execution.
+
+#### Installing Elasticsearch
+
+Instructions for setting up an Elasticsearch cluster can be found
+[here](https://www.elastic.co/guide/en/elasticsearch/reference/current/setup.html).
+Make sure to set and remember a cluster name. This must be set when
+creating a Sink for writing to your cluster
+
+#### Elasticsearch Sink
+The connector provides a Sink that can send data to an Elasticsearch Index.
+
+The sink can use two different methods for communicating with Elasticsearch:
+
+1. An embedded Node
+2. The TransportClient
+
+See 
[here](https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/client.html)
+for information about the differences between the two modes.
+
+This code shows how to create a sink that uses an embedded Node for
+communication:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+DataStream<String> input = ...;
+
+Map<String, String> config = Maps.newHashMap();
+// This instructs the sink to emit after every element, otherwise they would 
be buffered
+config.put("bulk.flush.max.actions", "1");
+config.put("cluster.name", "my-cluster-name");
+
+input.addSink(new ElasticsearchSink<>(config, new 
IndexRequestBuilder<String>() {
+    @Override
+    public IndexRequest createIndexRequest(String element, RuntimeContext ctx) 
{
+        Map<String, Object> json = new HashMap<>();
+        json.put("data", element);
+
+        return Requests.indexRequest()
+                .index("my-index")
+                .type("my-type")
+                .source(json);
+    }
+}));
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val input: DataStream[String] = ...
+
+val config = new util.HashMap[String, String]
+config.put("bulk.flush.max.actions", "1")
+config.put("cluster.name", "my-cluster-name")
+
+text.addSink(new ElasticsearchSink(config, new IndexRequestBuilder[String] {
+  override def createIndexRequest(element: String, ctx: RuntimeContext): 
IndexRequest = {
+    val json = new util.HashMap[String, AnyRef]
+    json.put("data", element)
+    println("SENDING: " + element)
+    Requests.indexRequest.index("my-index").`type`("my-type").source(json)
+  }
+}))
+{% endhighlight %}
+</div>
+</div>
+
+Note how a Map of Strings is used to configure the Sink. The configuration keys
+are documented in the Elasticsearch documentation
+[here](https://www.elastic.co/guide/en/elasticsearch/reference/current/index.html).
+Especially important is the `cluster.name` parameter that must correspond to
+the name of your cluster.
+
+Internally, the sink uses a `BulkProcessor` to send index requests to the 
cluster.
+This will buffer elements before sending a request to the cluster. The 
behaviour of the
+`BulkProcessor` can be configured using these config keys:
+ * **bulk.flush.max.actions**: Maximum amount of elements to buffer
+ * **bulk.flush.max.size.mb**: Maximum amount of data (in megabytes) to buffer
+ * **bulk.flush.interval.ms**: Interval at which to flush data regardless of 
the other two
+  settings in milliseconds
+
+This example code does the same, but with a `TransportClient`:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+DataStream<String> input = ...;
+
+Map<String, String> config = Maps.newHashMap();
+// This instructs the sink to emit after every element, otherwise they would 
be buffered
+config.put("bulk.flush.max.actions", "1");
+config.put("cluster.name", "my-cluster-name");
+
+List<TransportAddress> transports = new ArrayList<String>();
+transports.add(new InetSocketTransportAddress("node-1", 9300));
+transports.add(new InetSocketTransportAddress("node-2", 9300));
+
+input.addSink(new ElasticsearchSink<>(config, transports, new 
IndexRequestBuilder<String>() {
+    @Override
+    public IndexRequest createIndexRequest(String element, RuntimeContext ctx) 
{
+        Map<String, Object> json = new HashMap<>();
+        json.put("data", element);
+
+        return Requests.indexRequest()
+                .index("my-index")
+                .type("my-type")
+                .source(json);
+    }
+}));
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val input: DataStream[String] = ...
+
+val config = new util.HashMap[String, String]
+config.put("bulk.flush.max.actions", "1")
+config.put("cluster.name", "my-cluster-name")
+
+val transports = new ArrayList[String]
+transports.add(new InetSocketTransportAddress("node-1", 9300))
+transports.add(new InetSocketTransportAddress("node-2", 9300))
+
+text.addSink(new ElasticsearchSink(config, transports, new 
IndexRequestBuilder[String] {
+  override def createIndexRequest(element: String, ctx: RuntimeContext): 
IndexRequest = {
+    val json = new util.HashMap[String, AnyRef]
+    json.put("data", element)
+    println("SENDING: " + element)
+    Requests.indexRequest.index("my-index").`type`("my-type").source(json)
+  }
+}))
+{% endhighlight %}
+</div>
+</div>
+
+The difference is that we now need to provide a list of Elasticsearch Nodes
+to which the sink should connect using a `TransportClient`.
+
+More information about Elasticsearch can be found [here](https://elastic.co).

http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/dev/connectors/elasticsearch2.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/elasticsearch2.md 
b/docs/dev/connectors/elasticsearch2.md
new file mode 100644
index 0000000..8eed690
--- /dev/null
+++ b/docs/dev/connectors/elasticsearch2.md
@@ -0,0 +1,141 @@
+---
+title: "Elasticsearch 2.x Connector"
+nav-title: Elasticsearch 2.x
+nav-parent_id: connectors
+nav-pos: 5
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+This connector provides a Sink that can write to an
+[Elasticsearch 2.x](https://elastic.co/) Index. To use this connector, add the
+following dependency to your project:
+
+{% highlight xml %}
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-connector-elasticsearch2{{ site.scala_version_suffix 
}}</artifactId>
+  <version>{{site.version }}</version>
+</dependency>
+{% endhighlight %}
+
+Note that the streaming connectors are currently not part of the binary
+distribution. See
+[here]({{site.baseurl}}/dev/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution)
+for information about how to package the program with the libraries for
+cluster execution.
+
+#### Installing Elasticsearch 2.x
+
+Instructions for setting up an Elasticsearch cluster can be found
+[here](https://www.elastic.co/guide/en/elasticsearch/reference/current/setup.html).
+Make sure to set and remember a cluster name. This must be set when
+creating a Sink for writing to your cluster
+
+#### Elasticsearch 2.x Sink
+The connector provides a Sink that can send data to an Elasticsearch 2.x Index.
+
+The sink communicates with Elasticsearch via Transport Client
+
+See 
[here](https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/transport-client.html)
+for information about the Transport Client.
+
+The code below shows how to create a sink that uses a `TransportClient` for 
communication:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+File dataDir = ....;
+
+DataStream<String> input = ...;
+
+Map<String, String> config = new HashMap<>();
+// This instructs the sink to emit after every element, otherwise they would 
be buffered
+config.put("bulk.flush.max.actions", "1");
+config.put("cluster.name", "my-cluster-name");
+
+List<InetSocketAddress> transports = new ArrayList<>();
+transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 
9300));
+transports.add(new InetSocketAddress(InetAddress.getByName("10.2.3.1"), 9300));
+
+input.addSink(new ElasticsearchSink(config, transports, new 
ElasticsearchSinkFunction<String>() {
+  public IndexRequest createIndexRequest(String element) {
+    Map<String, String> json = new HashMap<>();
+    json.put("data", element);
+
+    return Requests.indexRequest()
+            .index("my-index")
+            .type("my-type")
+            .source(json);
+  }
+
+  @Override
+  public void process(String element, RuntimeContext ctx, RequestIndexer 
indexer) {
+    indexer.add(createIndexRequest(element));
+  }
+}));
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val dataDir = ....;
+
+val input: DataStream[String] = ...
+
+val config = new util.HashMap[String, String]
+config.put("bulk.flush.max.actions", "1")
+config.put("cluster.name", "my-cluster-name")
+
+val transports = new ArrayList[String]
+transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300))
+transports.add(new InetSocketAddress(InetAddress.getByName("10.2.3.1"), 9300));
+
+input.addSink(new ElasticsearchSink(config, transports, new 
ElasticsearchSinkFunction[String] {
+  def createIndexRequest(element: String): IndexRequest = {
+    val json = new util.HashMap[String, AnyRef]
+    json.put("data", element)
+    Requests.indexRequest.index("my-index").`type`("my-type").source(json)
+  }
+
+  override def process(element: String, ctx: RuntimeContext, indexer: 
RequestIndexer) {
+    indexer.add(createIndexRequest(element))
+  }
+}))
+{% endhighlight %}
+</div>
+</div>
+
+A Map of Strings is used to configure the Sink. The configuration keys
+are documented in the Elasticsearch documentation
+[here](https://www.elastic.co/guide/en/elasticsearch/reference/current/index.html).
+Especially important is the `cluster.name`. parameter that must correspond to
+the name of your cluster and with ElasticSearch 2x you also need to specify 
`path.home`.
+
+Internally, the sink uses a `BulkProcessor` to send Action requests to the 
cluster.
+This will buffer elements and Action Requests before sending to the cluster. 
The behaviour of the
+`BulkProcessor` can be configured using these config keys:
+ * **bulk.flush.max.actions**: Maximum amount of elements to buffer
+ * **bulk.flush.max.size.mb**: Maximum amount of data (in megabytes) to buffer
+ * **bulk.flush.interval.ms**: Interval at which to flush data regardless of 
the other two
+  settings in milliseconds
+
+This now provides a list of Elasticsearch Nodes
+to which the sink should connect via a `TransportClient`.
+
+More information about Elasticsearch can be found [here](https://elastic.co).

http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/dev/connectors/filesystem_sink.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/filesystem_sink.md 
b/docs/dev/connectors/filesystem_sink.md
new file mode 100644
index 0000000..c6318e8
--- /dev/null
+++ b/docs/dev/connectors/filesystem_sink.md
@@ -0,0 +1,130 @@
+---
+title: "HDFS Connector"
+nav-title: Rolling File Sink
+nav-parent_id: connectors
+nav-pos: 6
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+This connector provides a Sink that writes rolling files to any filesystem 
supported by
+Hadoop FileSystem. To use this connector, add the
+following dependency to your project:
+
+{% highlight xml %}
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-connector-filesystem{{ site.scala_version_suffix 
}}</artifactId>
+  <version>{{site.version}}</version>
+</dependency>
+{% endhighlight %}
+
+Note that the streaming connectors are currently not part of the binary
+distribution. See
+[here]({{site.baseurl}}/dev/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution)
+for information about how to package the program with the libraries for
+cluster execution.
+
+#### Rolling File Sink
+
+The rolling behaviour as well as the writing can be configured but we will get 
to that later.
+This is how you can create a default rolling sink:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+DataStream<String> input = ...;
+
+input.addSink(new RollingSink<String>("/base/path"));
+
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val input: DataStream[String] = ...
+
+input.addSink(new RollingSink("/base/path"))
+
+{% endhighlight %}
+</div>
+</div>
+
+The only required parameter is the base path where the rolling files (buckets) 
will be
+stored. The sink can be configured by specifying a custom bucketer, writer and 
batch size.
+
+By default the rolling sink will use the pattern `"yyyy-MM-dd--HH"` to name 
the rolling buckets.
+This pattern is passed to `SimpleDateFormat` with the current system time to 
form a bucket path. A
+new bucket will be created whenever the bucket path changes. For example, if 
you have a pattern
+that contains minutes as the finest granularity you will get a new bucket 
every minute.
+Each bucket is itself a directory that contains several part files: Each 
parallel instance
+of the sink will create its own part file and when part files get too big the 
sink will also
+create a new part file next to the others. To specify a custom bucketer use 
`setBucketer()`
+on a `RollingSink`.
+
+The default writer is `StringWriter`. This will call `toString()` on the 
incoming elements
+and write them to part files, separated by newline. To specify a custom writer 
use `setWriter()`
+on a `RollingSink`. If you want to write Hadoop SequenceFiles you can use the 
provided
+`SequenceFileWriter` which can also be configured to use compression.
+
+The last configuration option is the batch size. This specifies when a part 
file should be closed
+and a new one started. (The default part file size is 384 MB).
+
+Example:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+DataStream<Tuple2<IntWritable,Text>> input = ...;
+
+RollingSink sink = new RollingSink<String>("/base/path");
+sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm"));
+sink.setWriter(new SequenceFileWriter<IntWritable, Text>());
+sink.setBatchSize(1024 * 1024 * 400); // this is 400 MB,
+
+input.addSink(sink);
+
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val input: DataStream[Tuple2[IntWritable, Text]] = ...
+
+val sink = new RollingSink[String]("/base/path")
+sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm"))
+sink.setWriter(new SequenceFileWriter[IntWritable, Text]())
+sink.setBatchSize(1024 * 1024 * 400) // this is 400 MB,
+
+input.addSink(sink)
+
+{% endhighlight %}
+</div>
+</div>
+
+This will create a sink that writes to bucket files that follow this schema:
+
+```
+/base/path/{date-time}/part-{parallel-task}-{count}
+```
+
+Where `date-time` is the string that we get from the date/time format, 
`parallel-task` is the index
+of the parallel sink instance and `count` is the running number of part files 
that where created
+because of the batch size.
+
+For in-depth information, please refer to the JavaDoc for
+[RollingSink](http://flink.apache.org/docs/latest/api/java/org/apache/flink/streaming/connectors/fs/RollingSink.html).

http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/dev/connectors/index.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/index.md b/docs/dev/connectors/index.md
new file mode 100644
index 0000000..c49c8c2
--- /dev/null
+++ b/docs/dev/connectors/index.md
@@ -0,0 +1,46 @@
+---
+title: "Streaming Connectors"
+nav-id: connectors
+nav-title: Connectors
+nav-parent_id: dev
+nav-pos: 7
+nav-show_overview: true
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+Connectors provide code for interfacing with various third-party systems.
+
+Currently these systems are supported:
+
+ * [Apache Kafka](https://kafka.apache.org/) (sink/source)
+ * [Elasticsearch](https://elastic.co/) (sink)
+ * [Elasticsearch 2x](https://elastic.com) (sink)
+ * [Hadoop FileSystem](http://hadoop.apache.org) (sink)
+ * [RabbitMQ](http://www.rabbitmq.com/) (sink/source)
+ * [Amazon Kinesis Streams](http://aws.amazon.com/kinesis/streams/) 
(sink/source)
+ * [Twitter Streaming API](https://dev.twitter.com/docs/streaming-apis) 
(source)
+ * [Apache NiFi](https://nifi.apache.org) (sink/source)
+ * [Apache Cassandra](https://cassandra.apache.org/) (sink)
+ * [Redis](http://redis.io/) (sink)
+
+To run an application using one of these connectors, additional third party
+components are usually required to be installed and launched, e.g. the servers
+for the message queues. Further instructions for these can be found in the
+corresponding subsections.

http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/dev/connectors/kafka.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/kafka.md b/docs/dev/connectors/kafka.md
new file mode 100644
index 0000000..d2221fa
--- /dev/null
+++ b/docs/dev/connectors/kafka.md
@@ -0,0 +1,289 @@
+---
+title: "Apache Kafka Connector"
+nav-title: Kafka
+nav-parent_id: connectors
+nav-pos: 1
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+This connector provides access to event streams served by [Apache 
Kafka](https://kafka.apache.org/).
+
+Flink provides special Kafka Connectors for reading and writing data from/to 
Kafka topics.
+The Flink Kafka Consumer integrates with Flink's checkpointing mechanism to 
provide
+exactly-once processing semantics. To achieve that, Flink does not purely rely 
on Kafka's consumer group
+offset tracking, but tracks and checkpoints these offsets internally as well.
+
+Please pick a package (maven artifact id) and class name for your use-case and 
environment.
+For most users, the `FlinkKafkaConsumer08` (part of `flink-connector-kafka`) 
is appropriate.
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left">Maven Dependency</th>
+      <th class="text-left">Supported since</th>
+      <th class="text-left">Consumer and <br>
+      Producer Class name</th>
+      <th class="text-left">Kafka version</th>
+      <th class="text-left">Notes</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+        <td>flink-connector-kafka</td>
+        <td>0.9.1, 0.10</td>
+        <td>FlinkKafkaConsumer082<br>
+        FlinkKafkaProducer</td>
+        <td>0.8.x</td>
+        <td>Uses the <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example";>SimpleConsumer</a>
 API of Kafka internally. Offsets are committed to ZK by Flink.</td>
+    </tr>
+     <tr>
+        <td>flink-connector-kafka-0.8{{ site.scala_version_suffix }}</td>
+        <td>1.0.0</td>
+        <td>FlinkKafkaConsumer08<br>
+        FlinkKafkaProducer08</td>
+        <td>0.8.x</td>
+        <td>Uses the <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example";>SimpleConsumer</a>
 API of Kafka internally. Offsets are committed to ZK by Flink.</td>
+    </tr>
+     <tr>
+        <td>flink-connector-kafka-0.9{{ site.scala_version_suffix }}</td>
+        <td>1.0.0</td>
+        <td>FlinkKafkaConsumer09<br>
+        FlinkKafkaProducer09</td>
+        <td>0.9.x</td>
+        <td>Uses the new <a 
href="http://kafka.apache.org/documentation.html#newconsumerapi";>Consumer 
API</a> Kafka.</td>
+    </tr>
+  </tbody>
+</table>
+
+Then, import the connector in your maven project:
+
+{% highlight xml %}
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-connector-kafka-0.8{{ site.scala_version_suffix 
}}</artifactId>
+  <version>{{site.version }}</version>
+</dependency>
+{% endhighlight %}
+
+Note that the streaming connectors are currently not part of the binary 
distribution. See how to link with them for cluster execution [here]({{ 
site.baseurl}}/dev/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution).
+
+### Installing Apache Kafka
+
+* Follow the instructions from [Kafka's 
quickstart](https://kafka.apache.org/documentation.html#quickstart) to download 
the code and launch a server (launching a Zookeeper and a Kafka server is 
required every time before starting the application).
+* On 32 bit computers 
[this](http://stackoverflow.com/questions/22325364/unrecognized-vm-option-usecompressedoops-when-running-kafka-from-my-ubuntu-in)
 problem may occur.
+* If the Kafka and Zookeeper servers are running on a remote machine, then the 
`advertised.host.name` setting in the `config/server.properties` file must be 
set to the machine's IP address.
+
+### Kafka Consumer
+
+Flink's Kafka consumer is called `FlinkKafkaConsumer08` (or `09` for Kafka 
0.9.0.x versions). It provides access to one or more Kafka topics.
+
+The constructor accepts the following arguments:
+
+1. The topic name / list of topic names
+2. A DeserializationSchema / KeyedDeserializationSchema for deserializing the 
data from Kafka
+3. Properties for the Kafka consumer.
+  The following properties are required:
+  - "bootstrap.servers" (comma separated list of Kafka brokers)
+  - "zookeeper.connect" (comma separated list of Zookeeper servers) (**only 
required for Kafka 0.8**)
+  - "group.id" the id of the consumer group
+
+Example:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+Properties properties = new Properties();
+properties.setProperty("bootstrap.servers", "localhost:9092");
+// only required for Kafka 0.8
+properties.setProperty("zookeeper.connect", "localhost:2181");
+properties.setProperty("group.id", "test");
+DataStream<String> stream = env
+       .addSource(new FlinkKafkaConsumer08<>("topic", new 
SimpleStringSchema(), properties))
+       .print();
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val properties = new Properties();
+properties.setProperty("bootstrap.servers", "localhost:9092");
+// only required for Kafka 0.8
+properties.setProperty("zookeeper.connect", "localhost:2181");
+properties.setProperty("group.id", "test");
+stream = env
+    .addSource(new FlinkKafkaConsumer08[String]("topic", new 
SimpleStringSchema(), properties))
+    .print
+{% endhighlight %}
+</div>
+</div>
+
+The current FlinkKafkaConsumer implementation will establish a connection from 
the client (when calling the constructor)
+for querying the list of topics and partitions.
+
+For this to work, the consumer needs to be able to access the consumers from 
the machine submitting the job to the Flink cluster.
+If you experience any issues with the Kafka consumer on the client side, the 
client log might contain information about failed requests, etc.
+
+#### The `DeserializationSchema`
+
+The Flink Kafka Consumer needs to know how to turn the binary data in Kafka 
into Java/Scala objects. The
+`DeserializationSchema` allows users to specify such a schema. The `T 
deserialize(byte[] message)`
+method gets called for each Kafka message, passing the value from Kafka.
+
+It is usually helpful to start from the `AbstractDeserializationSchema`, which 
takes care of describing the
+produced Java/Scala type to Flink's type system. Users that implement a 
vanilla `DeserializationSchema` need
+to implement the `getProducedType(...)` method themselves.
+
+For accessing both the key and value of the Kafka message, the 
`KeyedDeserializationSchema` has
+the following deserialize method ` T deserialize(byte[] messageKey, byte[] 
message, String topic, int partition, long offset)`.
+
+For convenience, Flink provides the following schemas:
+
+1. `TypeInformationSerializationSchema` (and 
`TypeInformationKeyValueSerializationSchema`) which creates
+    a schema based on a Flink's `TypeInformation`. This is useful if the data 
is both written and read by Flink.
+    This schema is a performant Flink-specific alternative to other generic 
serialization approaches.
+
+2. `JsonDeserializationSchema` (and `JSONKeyValueDeserializationSchema`) which 
turns the serialized JSON
+    into an ObjectNode object, from which fields can be accessed using 
objectNode.get("field").as(Int/String/...)().
+    The KeyValue objectNode contains a "key" and "value" field which contain 
all fields, as well as
+    an optional "metadata" field that exposes the offset/partition/topic for 
this message.
+
+#### Kafka Consumers and Fault Tolerance
+
+With Flink's checkpointing enabled, the Flink Kafka Consumer will consume 
records from a topic and periodically checkpoint all
+its Kafka offsets, together with the state of other operations, in a 
consistent manner. In case of a job failure, Flink will restore
+the streaming program to the state of the latest checkpoint and re-consume the 
records from Kafka, starting from the offsets that where
+stored in the checkpoint.
+
+The interval of drawing checkpoints therefore defines how much the program may 
have to go back at most, in case of a failure.
+
+To use fault tolerant Kafka Consumers, checkpointing of the topology needs to 
be enabled at the execution environment:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+env.enableCheckpointing(5000); // checkpoint every 5000 msecs
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+env.enableCheckpointing(5000) // checkpoint every 5000 msecs
+{% endhighlight %}
+</div>
+</div>
+
+Also note that Flink can only restart the topology if enough processing slots 
are available to restart the topology.
+So if the topology fails due to loss of a TaskManager, there must still be 
enough slots available afterwards.
+Flink on YARN supports automatic restart of lost YARN containers.
+
+If checkpointing is not enabled, the Kafka consumer will periodically commit 
the offsets to Zookeeper.
+
+#### Kafka Consumers and Timestamp Extraction/Watermark Emission
+
+In many scenarios, the timestamp of a record is embedded (explicitly or 
implicitly) in the record itself.
+In addition, the user may want to emit watermarks either periodically, or in 
an irregular fashion, e.g. based on
+special records in the Kafka stream that contain the current event-time 
watermark. For these cases, the Flink Kafka
+Consumer allows the specification of an `AssignerWithPeriodicWatermarks` or an 
`AssignerWithPunctuatedWatermarks`.
+
+You can specify your custom timestamp extractor/watermark emitter as described
+[here]({{ site.baseurl }}/apis/streaming/event_timestamps_watermarks.html), or 
use one from the
+[predefined ones]({{ site.baseurl 
}}/apis/streaming/event_timestamp_extractors.html). After doing so, you
+can pass it to your consumer in the following way:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+Properties properties = new Properties();
+properties.setProperty("bootstrap.servers", "localhost:9092");
+// only required for Kafka 0.8
+properties.setProperty("zookeeper.connect", "localhost:2181");
+properties.setProperty("group.id", "test");
+
+FlinkKafkaConsumer08<String> myConsumer =
+    new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), properties);
+myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter());
+
+DataStream<String> stream = env
+       .addSource(myConsumer)
+       .print();
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val properties = new Properties();
+properties.setProperty("bootstrap.servers", "localhost:9092");
+// only required for Kafka 0.8
+properties.setProperty("zookeeper.connect", "localhost:2181");
+properties.setProperty("group.id", "test");
+
+val myConsumer = new FlinkKafkaConsumer08[String]("topic", new 
SimpleStringSchema(), properties);
+myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter());
+stream = env
+    .addSource(myConsumer)
+    .print
+{% endhighlight %}
+</div>
+</div>
+
+Internally, an instance of the assigner is executed per Kafka partition.
+When such an assigner is specified, for each record read from Kafka, the
+`extractTimestamp(T element, long previousElementTimestamp)` is called to 
assign a timestamp to the record and
+the `Watermark getCurrentWatermark()` (for periodic) or the
+`Watermark checkAndGetNextWatermark(T lastElement, long extractedTimestamp)` 
(for punctuated) is called to determine
+if a new watermark should be emitted and with which timestamp.
+
+### Kafka Producer
+
+The `FlinkKafkaProducer08` writes data to a Kafka topic. The producer can 
specify a custom partitioner that assigns
+records to partitions.
+
+Example:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+stream.addSink(new FlinkKafkaProducer08<String>("localhost:9092", "my-topic", 
new SimpleStringSchema()));
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+stream.addSink(new FlinkKafkaProducer08[String]("localhost:9092", "my-topic", 
new SimpleStringSchema()))
+{% endhighlight %}
+</div>
+</div>
+
+You can also define a custom Kafka producer configuration for the KafkaSink 
with the constructor. Please refer to
+the [Apache Kafka documentation](https://kafka.apache.org/documentation.html) 
for details on how to configure
+Kafka Producers.
+
+Similar to the consumer, the producer also allows using an advanced 
serialization schema which allows
+serializing the key and value separately. It also allows to override the 
target topic id, so that
+one producer instance can send data to multiple topics.
+
+The interface of the serialization schema is called `KeyedSerializationSchema`.
+
+
+**Note**: By default, the number of retries is set to "0". This means that the 
producer fails immediately on errors,
+including leader changes. The value is set to "0" by default to avoid 
duplicate messages in the target topic.
+For most production environments with frequent broker changes, we recommend 
setting the number of retries to a
+higher value.
+
+There is currently no transactional producer for Kafka, so Flink can not 
guarantee exactly-once delivery
+into a Kafka topic.

Reply via email to