http://git-wip-us.apache.org/repos/asf/flink-web/blob/f0ac0cdb/content/docs/0.9/apis/streaming_guide.html
----------------------------------------------------------------------
diff --git a/content/docs/0.9/apis/streaming_guide.html 
b/content/docs/0.9/apis/streaming_guide.html
deleted file mode 100644
index f0ba539..0000000
--- a/content/docs/0.9/apis/streaming_guide.html
+++ /dev/null
@@ -1,1944 +0,0 @@
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<!DOCTYPE html>
-
-<html lang="en">
-  <head>
-    <meta charset="utf-8">
-    <meta http-equiv="X-UA-Compatible" content="IE=edge">
-    <meta name="viewport" content="width=device-width, initial-scale=1">
-    <!-- The above 3 meta tags *must* come first in the head; any other head 
content must come *after* these tags -->
-    
-    <title>Apache Flink 0.9.0 Documentation: Flink Stream Processing 
API</title>
-    
-    <link rel="shortcut icon" 
href="http://flink.apache.org/docs/0.9/page/favicon.ico"; type="image/x-icon">
-    <link rel="icon" href="http://flink.apache.org/docs/0.9/page/favicon.ico"; 
type="image/x-icon">
-
-    <!-- Bootstrap -->
-    <link rel="stylesheet" 
href="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.4/css/bootstrap.min.css";>
-    <link rel="stylesheet" 
href="http://flink.apache.org/docs/0.9/page/css/flink.css";>
-    <link rel="stylesheet" 
href="http://flink.apache.org/docs/0.9/page/css/syntax.css";>
-    <link rel="stylesheet" 
href="http://flink.apache.org/docs/0.9/page/css/codetabs.css";>
-    
-    <!-- HTML5 shim and Respond.js for IE8 support of HTML5 elements and media 
queries -->
-    <!-- WARNING: Respond.js doesn't work if you view the page via file:// -->
-    <!--[if lt IE 9]>
-      <script 
src="https://oss.maxcdn.com/html5shiv/3.7.2/html5shiv.min.js";></script>
-      <script 
src="https://oss.maxcdn.com/respond/1.4.2/respond.min.js";></script>
-    <![endif]-->
-  </head>
-  <body>
-    
-    
-
-
-
-
-    <!-- Top navbar. -->
-    <nav class="navbar navbar-default navbar-fixed-top">
-      <div class="container">
-        <!-- The logo. -->
-        <div class="navbar-header">
-          <button type="button" class="navbar-toggle collapsed" 
data-toggle="collapse" data-target="#bs-example-navbar-collapse-1">
-            <span class="icon-bar"></span>
-            <span class="icon-bar"></span>
-            <span class="icon-bar"></span>
-          </button>
-          <div class="navbar-logo">
-            <a href="http://flink.apache.org";><img alt="Apache Flink" 
src="http://flink.apache.org/docs/0.9/page/img/navbar-brand-logo.jpg";></a>
-          </div>
-        </div><!-- /.navbar-header -->
-
-        <!-- The navigation links. -->
-        <div class="collapse navbar-collapse" 
id="bs-example-navbar-collapse-1">
-          <ul class="nav navbar-nav">
-            <li><a 
href="http://flink.apache.org/docs/0.9/index.html";>Overview<span 
class="hidden-sm hidden-xs"> 0.9.0</span></a></li>
-
-            <!-- Setup -->
-            <li class="dropdown">
-              <a href="http://flink.apache.org/docs/0.9/setup"; 
class="dropdown-toggle" data-toggle="dropdown" role="button" 
aria-expanded="false">Setup <span class="caret"></span></a>
-              <ul class="dropdown-menu" role="menu">
-                <li><a 
href="http://flink.apache.org/docs/0.9/setup/building.html";>Get Flink 
0.9-SNAPSHOT</a></li>
-
-                <li class="divider"></li>
-                <li role="presentation" 
class="dropdown-header"><strong>Deployment</strong></li>
-                <li><a 
href="http://flink.apache.org/docs/0.9/setup/local_setup.html"; 
class="active">Local</a></li>
-                <li><a 
href="http://flink.apache.org/docs/0.9/setup/cluster_setup.html";>Cluster 
(Standalone)</a></li>
-                <li><a 
href="http://flink.apache.org/docs/0.9/setup/yarn_setup.html";>YARN</a></li>
-                <li><a 
href="http://flink.apache.org/docs/0.9/setup/gce_setup.html";>GCloud</a></li>
-                <li><a 
href="http://flink.apache.org/docs/0.9/setup/flink_on_tez.html";>Flink on Tez 
<span class="badge">Beta</span></a></li>
-
-                <li class="divider"></li>
-                <li><a 
href="http://flink.apache.org/docs/0.9/setup/config.html";>Configuration</a></li>
-              </ul>
-            </li>
-
-            <!-- Programming Guides -->
-            <li class="dropdown">
-              <a href="http://flink.apache.org/docs/0.9/apis"; 
class="dropdown-toggle" data-toggle="dropdown" role="button" 
aria-expanded="false">Programming Guides <span class="caret"></span></a>
-              <ul class="dropdown-menu" role="menu">
-                <li><a 
href="http://flink.apache.org/docs/0.9/apis/programming_guide.html";><strong>Batch:
 DataSet API</strong></a></li>
-                <li><a 
href="http://flink.apache.org/docs/0.9/apis/streaming_guide.html";><strong>Streaming:
 DataStream API</strong> <span class="badge">Beta</span></a></li>
-                <li><a 
href="http://flink.apache.org/docs/0.9/apis/python.html";>Python API <span 
class="badge">Beta</span></a></li>
-
-                <li class="divider"></li>
-                <li><a href="scala_shell.html">Interactive Scala Shell</a></li>
-                <li><a 
href="http://flink.apache.org/docs/0.9/apis/dataset_transformations.html";>Dataset
 Transformations</a></li>
-                <li><a 
href="http://flink.apache.org/docs/0.9/apis/best_practices.html";>Best 
Practices</a></li>
-                <li><a 
href="http://flink.apache.org/docs/0.9/apis/example_connectors.html";>Connectors</a></li>
-                <li><a 
href="http://flink.apache.org/docs/0.9/apis/examples.html";>Examples</a></li>
-                <li><a 
href="http://flink.apache.org/docs/0.9/apis/local_execution.html";>Local 
Execution</a></li>
-                <li><a 
href="http://flink.apache.org/docs/0.9/apis/cluster_execution.html";>Cluster 
Execution</a></li>
-                <li><a 
href="http://flink.apache.org/docs/0.9/apis/cli.html";>Command Line 
Interface</a></li>
-                <li><a 
href="http://flink.apache.org/docs/0.9/apis/web_client.html";>Web Client</a></li>
-                <li><a 
href="http://flink.apache.org/docs/0.9/apis/iterations.html";>Iterations</a></li>
-                <li><a 
href="http://flink.apache.org/docs/0.9/apis/java8.html";>Java 8</a></li>
-                <li><a 
href="http://flink.apache.org/docs/0.9/apis/hadoop_compatibility.html";>Hadoop 
Compatability <span class="badge">Beta</span></a></li>
-              </ul>
-            </li>
-
-            <!-- Libraries -->
-            <li class="dropdown">
-              <a href="http://flink.apache.org/docs/0.9/libs"; 
class="dropdown-toggle" data-toggle="dropdown" role="button" 
aria-expanded="false">Libraries <span class="caret"></span></a>
-                <ul class="dropdown-menu" role="menu">
-                  <li><a 
href="http://flink.apache.org/docs/0.9/libs/spargel_guide.html";>Graphs: 
Spargel</a></li>
-                  <li><a 
href="http://flink.apache.org/docs/0.9/libs/gelly_guide.html";>Graphs: Gelly 
<span class="badge">Beta</span></a></li>
-                  <li><a 
href="http://flink.apache.org/docs/0.9/libs/ml/";>Machine Learning <span 
class="badge">Beta</span></a></li>
-                  <li><a 
href="http://flink.apache.org/docs/0.9/libs/table.html";>Relational: Table <span 
class="badge">Beta</span></a></li>
-              </ul>
-            </li>
-
-            <!-- Internals -->
-            <li class="dropdown">
-              <a href="http://flink.apache.org/docs/0.9/internals"; 
class="dropdown-toggle" data-toggle="dropdown" role="button" 
aria-expanded="false">Internals <span class="caret"></span></a>
-              <ul class="dropdown-menu" role="menu">
-                <li role="presentation" 
class="dropdown-header"><strong>Contribute</strong></li>
-                <li><a 
href="http://flink.apache.org/docs/0.9/internals/how_to_contribute.html";>How to 
Contribute</a></li>
-                <li><a 
href="http://flink.apache.org/docs/0.9/internals/coding_guidelines.html";>Coding 
Guidelines</a></li>
-                <li><a 
href="http://flink.apache.org/docs/0.9/internals/ide_setup.html";>IDE 
Setup</a></li>
-                <li><a 
href="http://flink.apache.org/docs/0.9/internals/logging.html";>Logging</a></li>
-                <li class="divider"></li>
-                <li role="presentation" 
class="dropdown-header"><strong>Internals</strong></li>
-                <li><a 
href="http://flink.apache.org/docs/0.9/internals/general_arch.html";>Architecture
 &amp; Process Model</a></li>
-                <li><a 
href="http://flink.apache.org/docs/0.9/internals/types_serialization.html";>Type 
Extraction &amp; Serialization</a></li>
-                <li><a 
href="http://flink.apache.org/docs/0.9/internals/job_scheduling.html";>Jobs 
&amp; Scheduling</a></li>
-                <li><a 
href="http://flink.apache.org/docs/0.9/internals/add_operator.html";>How-To: Add 
an Operator</a></li>
-              </ul>
-            </li>
-          </ul>
-          <form class="navbar-form navbar-right hidden-sm hidden-md" 
role="search" action="http://flink.apache.org/docs/0.9/search-results.html";>
-            <div class="form-group">
-              <input type="text" class="form-control" name="q" 
placeholder="Search all pages">
-            </div>
-            <button type="submit" class="btn btn-default">Search</button>
-          </form>
-        </div><!-- /.navbar-collapse -->
-      </div><!-- /.container -->
-    </nav>
-
-
-    
-
-    <!-- Main content. -->
-    <div class="container">
-      
-      
-<div class="row">
-  <div class="col-sm-10 col-sm-offset-1">
-    <h1>Flink Stream Processing API <span class="badge">Beta</span></h1>
-
-
-
-<p><a href="#top"></a></p>
-
-<p>Flink Streaming is a system for high-throughput, low-latency data stream 
processing. Flink Streaming natively supports <a 
href="#stateful-computation">stateful computation</a>, data-driven <a 
href="#window-operators">windowing semantics</a> and <a 
href="#iterations">iterative</a> stream processing. The system can connect to 
and process data streams from different data sources like file sources, web 
sockets, message queues (Apache Kafka, RabbitMQ, Twitter Streaming API …), 
and also from any user defined data sources. Data streams can be transformed 
and modified to create new data streams using high-level functions similar to 
the ones provided by the batch processing API.</p>
-
-<ul id="markdown-toc">
-  <li><a href="#flink-streaming-api" 
id="markdown-toc-flink-streaming-api">Flink Streaming API</a></li>
-  <li><a href="#example-program" id="markdown-toc-example-program">Example 
Program</a></li>
-  <li><a href="#program-skeleton" id="markdown-toc-program-skeleton">Program 
Skeleton</a></li>
-  <li><a href="#basics" id="markdown-toc-basics">Basics</a>    <ul>
-      <li><a href="#datastream" 
id="markdown-toc-datastream">DataStream</a></li>
-      <li><a href="#object-reuse-behavior" 
id="markdown-toc-object-reuse-behavior">Object Reuse Behavior</a></li>
-      <li><a href="#partitioning" 
id="markdown-toc-partitioning">Partitioning</a></li>
-      <li><a href="#connecting-to-the-outside-world" 
id="markdown-toc-connecting-to-the-outside-world">Connecting to the outside 
world</a></li>
-    </ul>
-  </li>
-  <li><a href="#transformations" 
id="markdown-toc-transformations">Transformations</a>    <ul>
-      <li><a href="#basic-transformations" 
id="markdown-toc-basic-transformations">Basic transformations</a></li>
-      <li><a href="#grouped-operators" 
id="markdown-toc-grouped-operators">Grouped operators</a></li>
-      <li><a href="#aggregations" 
id="markdown-toc-aggregations">Aggregations</a></li>
-      <li><a href="#window-operators" 
id="markdown-toc-window-operators">Window operators</a></li>
-      <li><a href="#temporal-database-style-operators" 
id="markdown-toc-temporal-database-style-operators">Temporal database style 
operators</a></li>
-      <li><a href="#co-operators" id="markdown-toc-co-operators">Co 
operators</a></li>
-      <li><a href="#output-splitting" 
id="markdown-toc-output-splitting">Output splitting</a></li>
-      <li><a href="#iterations" 
id="markdown-toc-iterations">Iterations</a></li>
-      <li><a href="#rich-functions" id="markdown-toc-rich-functions">Rich 
functions</a></li>
-    </ul>
-  </li>
-  <li><a href="#stateful-computation" 
id="markdown-toc-stateful-computation">Stateful computation</a></li>
-  <li><a href="#lambda-expressions-with-java-8" 
id="markdown-toc-lambda-expressions-with-java-8">Lambda expressions with Java 
8</a></li>
-  <li><a href="#operator-settings" 
id="markdown-toc-operator-settings">Operator Settings</a>    <ul>
-      <li><a href="#parallelism" 
id="markdown-toc-parallelism">Parallelism</a></li>
-      <li><a href="#buffer-timeout" id="markdown-toc-buffer-timeout">Buffer 
timeout</a></li>
-    </ul>
-  </li>
-  <li><a href="#stream-connectors" id="markdown-toc-stream-connectors">Stream 
connectors</a>    <ul>
-      <li><a href="#apache-kafka" id="markdown-toc-apache-kafka">Apache 
Kafka</a></li>
-      <li><a href="#rabbitmq" id="markdown-toc-rabbitmq">RabbitMQ</a></li>
-      <li><a href="#twitter-streaming-api" 
id="markdown-toc-twitter-streaming-api">Twitter Streaming API</a></li>
-      <li><a href="#docker-containers-for-connectors" 
id="markdown-toc-docker-containers-for-connectors">Docker containers for 
connectors</a></li>
-    </ul>
-  </li>
-</ul>
-
-<h2 id="flink-streaming-api">Flink Streaming API</h2>
-
-<p>The Streaming API is currently part of the <em>flink-staging</em> Maven 
project. All relevant classes are located in the 
<em>org.apache.flink.streaming</em> package.</p>
-
-<p>Add the following dependency to your <code>pom.xml</code> to use the Flink 
Streaming.</p>
-
-<div class="codetabs">
-  <div data-lang="java">
-
-    <div class="highlight"><pre><code class="language-xml" 
data-lang="xml"><span class="nt">&lt;dependency&gt;</span>
-  <span class="nt">&lt;groupId&gt;</span>org.apache.flink<span 
class="nt">&lt;/groupId&gt;</span>
-  <span class="nt">&lt;artifactId&gt;</span>flink-streaming-core<span 
class="nt">&lt;/artifactId&gt;</span>
-  <span class="nt">&lt;version&gt;</span>0.9.0<span 
class="nt">&lt;/version&gt;</span>
-<span class="nt">&lt;/dependency&gt;</span>
-<span class="nt">&lt;dependency&gt;</span>
-  <span class="nt">&lt;groupId&gt;</span>org.apache.flink<span 
class="nt">&lt;/groupId&gt;</span>
-  <span class="nt">&lt;artifactId&gt;</span>flink-clients<span 
class="nt">&lt;/artifactId&gt;</span>
-  <span class="nt">&lt;version&gt;</span>0.9.0<span 
class="nt">&lt;/version&gt;</span>
-<span class="nt">&lt;/dependency&gt;</span></code></pre></div>
-
-  </div>
-  <div data-lang="scala">
-
-    <div class="highlight"><pre><code class="language-xml" 
data-lang="xml"><span class="nt">&lt;dependency&gt;</span>
-  <span class="nt">&lt;groupId&gt;</span>org.apache.flink<span 
class="nt">&lt;/groupId&gt;</span>
-  <span class="nt">&lt;artifactId&gt;</span>flink-streaming-scala<span 
class="nt">&lt;/artifactId&gt;</span>
-  <span class="nt">&lt;version&gt;</span>0.9.0<span 
class="nt">&lt;/version&gt;</span>
-<span class="nt">&lt;/dependency&gt;</span>
-<span class="nt">&lt;dependency&gt;</span>
-  <span class="nt">&lt;groupId&gt;</span>org.apache.flink<span 
class="nt">&lt;/groupId&gt;</span>
-  <span class="nt">&lt;artifactId&gt;</span>flink-clients<span 
class="nt">&lt;/artifactId&gt;</span>
-  <span class="nt">&lt;version&gt;</span>0.9.0<span 
class="nt">&lt;/version&gt;</span>
-<span class="nt">&lt;/dependency&gt;</span></code></pre></div>
-
-  </div>
-</div>
-
-<p>In order to create your own Flink Streaming program, we encourage you to 
start with the <a href="#program-skeleton">skeleton</a> and gradually add your 
own <a href="#transformations">transformations</a>. The remaining sections act 
as references for additional transformations and advanced features.</p>
-
-<h2 id="example-program">Example Program</h2>
-
-<p>The following program is a complete, working example of streaming 
WordCount, that incrementally counts the words coming from a web socket. You 
can copy &amp; paste the code to run it locally.</p>
-
-<div class="codetabs">
-  <div data-lang="java">
-
-    <div class="highlight"><pre><code class="language-java" 
data-lang="java"><span class="kd">public</span> <span class="kd">class</span> 
<span class="nc">StreamingWordCount</span> <span class="o">{</span>
-
-    <span class="kd">public</span> <span class="kd">static</span> <span 
class="kt">void</span> <span class="nf">main</span><span 
class="o">(</span><span class="n">String</span><span class="o">[]</span> <span 
class="n">args</span><span class="o">)</span> <span class="o">{</span>
-
-        <span class="n">StreamExecutionEnvironment</span> <span 
class="n">env</span> <span class="o">=</span> <span 
class="n">StreamExecutionEnvironment</span><span class="o">.</span><span 
class="na">getExecutionEnvironment</span><span class="o">();</span>
-        
-        <span class="n">DataStream</span><span class="o">&lt;</span><span 
class="n">Tuple2</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">Integer</span><span class="o">&gt;&gt;</span> <span 
class="n">dataStream</span> <span class="o">=</span> <span class="n">env</span>
-                <span class="o">.</span><span 
class="na">socketTextStream</span><span class="o">(</span><span 
class="s">&quot;localhost&quot;</span><span class="o">,</span> <span 
class="mi">9999</span><span class="o">)</span>
-                <span class="o">.</span><span class="na">flatMap</span><span 
class="o">(</span><span class="k">new</span> <span 
class="nf">Splitter</span><span class="o">())</span>
-                <span class="o">.</span><span class="na">groupBy</span><span 
class="o">(</span><span class="mi">0</span><span class="o">)</span>
-                <span class="o">.</span><span class="na">sum</span><span 
class="o">(</span><span class="mi">1</span><span class="o">);</span>
-        
-        <span class="n">dataStream</span><span class="o">.</span><span 
class="na">print</span><span class="o">();</span>
-        
-        <span class="n">env</span><span class="o">.</span><span 
class="na">execute</span><span class="o">(</span><span class="s">&quot;Socket 
Stream WordCount&quot;</span><span class="o">);</span>
-    <span class="o">}</span>
-    
-    <span class="kd">public</span> <span class="kd">static</span> <span 
class="kd">class</span> <span class="nc">Splitter</span> <span 
class="kd">implements</span> <span class="n">FlatMapFunction</span><span 
class="o">&lt;</span><span class="n">String</span><span class="o">,</span> 
<span class="n">Tuple2</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">Integer</span><span class="o">&gt;&gt;</span> <span class="o">{</span>
-        <span class="nd">@Override</span>
-        <span class="kd">public</span> <span class="kt">void</span> <span 
class="nf">flatMap</span><span class="o">(</span><span class="n">String</span> 
<span class="n">sentence</span><span class="o">,</span> <span 
class="n">Collector</span><span class="o">&lt;</span><span 
class="n">Tuple2</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">Integer</span><span class="o">&gt;&gt;</span> <span 
class="n">out</span><span class="o">)</span> <span class="kd">throws</span> 
<span class="n">Exception</span> <span class="o">{</span>
-            <span class="k">for</span> <span class="o">(</span><span 
class="n">String</span> <span class="nl">word:</span> <span 
class="n">sentence</span><span class="o">.</span><span 
class="na">split</span><span class="o">(</span><span class="s">&quot; 
&quot;</span><span class="o">))</span> <span class="o">{</span>
-                <span class="n">out</span><span class="o">.</span><span 
class="na">collect</span><span class="o">(</span><span class="k">new</span> 
<span class="n">Tuple2</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">Integer</span><span class="o">&gt;(</span><span 
class="n">word</span><span class="o">,</span> <span class="mi">1</span><span 
class="o">));</span>
-            <span class="o">}</span>
-        <span class="o">}</span>
-    <span class="o">}</span>
-    
-<span class="o">}</span></code></pre></div>
-
-  </div>
-
-  <div data-lang="scala">
-
-    <div class="highlight"><pre><code class="language-scala" 
data-lang="scala"><span class="k">object</span> <span 
class="nc">WordCount</span> <span class="o">{</span>
-  <span class="k">def</span> <span class="n">main</span><span 
class="o">(</span><span class="n">args</span><span class="k">:</span> <span 
class="kt">Array</span><span class="o">[</span><span 
class="kt">String</span><span class="o">])</span> <span class="o">{</span>
-
-    <span class="k">val</span> <span class="n">env</span> <span 
class="k">=</span> <span class="nc">StreamExecutionEnvironment</span><span 
class="o">.</span><span class="n">getExecutionEnvironment</span>
-    <span class="k">val</span> <span class="n">text</span> <span 
class="k">=</span> <span class="n">env</span><span class="o">.</span><span 
class="n">socketTextStream</span><span class="o">(</span><span 
class="s">&quot;localhost&quot;</span><span class="o">,</span> <span 
class="mi">9999</span><span class="o">)</span>
-
-    <span class="k">val</span> <span class="n">counts</span> <span 
class="k">=</span> <span class="n">text</span><span class="o">.</span><span 
class="n">flatMap</span> <span class="o">{</span> <span class="k">_</span><span 
class="o">.</span><span class="n">toLowerCase</span><span 
class="o">.</span><span class="n">split</span><span class="o">(</span><span 
class="s">&quot;\\W+&quot;</span><span class="o">)</span> <span 
class="n">filter</span> <span class="o">{</span> <span class="k">_</span><span 
class="o">.</span><span class="n">nonEmpty</span> <span class="o">}</span> 
<span class="o">}</span>
-      <span class="o">.</span><span class="n">map</span> <span 
class="o">{</span> <span class="o">(</span><span class="k">_</span><span 
class="o">,</span> <span class="mi">1</span><span class="o">)</span> <span 
class="o">}</span>
-      <span class="o">.</span><span class="n">groupBy</span><span 
class="o">(</span><span class="mi">0</span><span class="o">)</span>
-      <span class="o">.</span><span class="n">sum</span><span 
class="o">(</span><span class="mi">1</span><span class="o">)</span>
-
-    <span class="n">counts</span><span class="o">.</span><span 
class="n">print</span>
-
-    <span class="n">env</span><span class="o">.</span><span 
class="n">execute</span><span class="o">(</span><span class="s">&quot;Scala 
Socket Stream WordCount&quot;</span><span class="o">)</span>
-  <span class="o">}</span>
-<span class="o">}</span></code></pre></div>
-
-  </div>
-
-</div>
-
-<p>To run the example program, start the input stream with netcat first from a 
terminal:</p>
-
-<div class="highlight"><pre><code class="language-bash">nc -lk 
9999</code></pre></div>
-
-<p>The lines typed to this terminal will be the source data stream for your 
streaming job.</p>
-
-<p><a href="#top">Back to top</a></p>
-
-<h2 id="program-skeleton">Program Skeleton</h2>
-
-<div class="codetabs">
-  <div data-lang="java">
-
-    <p>As presented in the <a href="#example-program">example</a>, a Flink 
Streaming program looks almost identical to a regular Flink program. Each 
stream processing program consists of the following parts:</p>
-
-    <ol>
-      <li>Obtaining a <code>StreamExecutionEnvironment</code>,</li>
-      <li>Connecting to data stream sources,</li>
-      <li>Specifying transformations on the data streams,</li>
-      <li>Specifying output for the processed data,</li>
-      <li>Executing the program.</li>
-    </ol>
-
-    <p>As these steps are basically the same as in the batch API, we will only 
note the important differences.
-For stream processing jobs, the user needs to obtain a 
<code>StreamExecutionEnvironment</code> in contrast with the <a 
href="programming_guide.html#program-skeleton">batch API</a> where one would 
need an <code>ExecutionEnvironment</code>. Otherwise, the process is 
essentially the same:</p>
-
-    <div class="highlight"><pre><code class="language-java" 
data-lang="java"><span class="n">StreamExecutionEnvironment</span><span 
class="o">.</span><span class="na">getExecutionEnvironment</span><span 
class="o">();</span>
-<span class="n">StreamExecutionEnvironment</span><span class="o">.</span><span 
class="na">createLocalEnvironment</span><span class="o">(</span><span 
class="n">parallelism</span><span class="o">);</span>
-<span class="n">StreamExecutionEnvironment</span><span class="o">.</span><span 
class="na">createRemoteEnvironment</span><span class="o">(</span><span 
class="n">String</span> <span class="n">host</span><span class="o">,</span> 
<span class="kt">int</span> <span class="n">port</span><span class="o">,</span> 
<span class="kt">int</span> <span class="n">parallelism</span><span 
class="o">,</span> <span class="n">String</span><span class="o">...</span> 
<span class="n">jarFiles</span><span class="o">);</span></code></pre></div>
-
-    <p>For connecting to data streams the 
<code>StreamExecutionEnvironment</code> has many different methods, from basic 
file sources to completely general user defined data sources. We will go into 
details in the <a href="#basics">basics</a> section.</p>
-
-    <p>For example:</p>
-
-    <div class="highlight"><pre><code class="language-java" 
data-lang="java"><span class="n">env</span><span class="o">.</span><span 
class="na">socketTextStream</span><span class="o">(</span><span 
class="n">host</span><span class="o">,</span> <span class="n">port</span><span 
class="o">);</span>
-<span class="n">env</span><span class="o">.</span><span 
class="na">fromElements</span><span class="o">(</span><span 
class="n">elements</span><span class="err">…</span><span class="o">);</span>
-<span class="n">env</span><span class="o">.</span><span 
class="na">addSource</span><span class="o">(</span><span 
class="n">sourceFunction</span><span class="o">)</span></code></pre></div>
-
-    <p>After defining the data stream sources the user can specify 
transformations on the data streams to create a new data stream. Different data 
streams can be also combined together for joint transformations which are being 
showcased in the <a href="#transformations">transformations</a> section.</p>
-
-    <p>For example:</p>
-
-    <div class="highlight"><pre><code class="language-java" 
data-lang="java"><span class="n">dataStream</span><span class="o">.</span><span 
class="na">map</span><span class="o">(</span><span 
class="n">mapFunction</span><span class="o">).</span><span 
class="na">reduce</span><span class="o">(</span><span 
class="n">reduceFunction</span><span class="o">);</span></code></pre></div>
-
-    <p>The processed data can be pushed to different outputs called sinks. The 
user can define their own sinks or use any predefined filesystem, message queue 
or database sink.</p>
-
-    <p>For example:</p>
-
-    <div class="highlight"><pre><code class="language-java" 
data-lang="java"><span class="n">dataStream</span><span class="o">.</span><span 
class="na">writeAsCsv</span><span class="o">(</span><span 
class="n">path</span><span class="o">);</span>
-<span class="n">dataStream</span><span class="o">.</span><span 
class="na">print</span><span class="o">();</span>
-<span class="n">dataStream</span><span class="o">.</span><span 
class="na">addSink</span><span class="o">(</span><span 
class="n">sinkFunction</span><span class="o">)</span></code></pre></div>
-
-    <p>Once the complete program is specified 
<code>execute(programName)</code> is to be called on the 
<code>StreamExecutionEnvironment</code>. This will either execute on the local 
machine or submit the program for execution on a cluster, depending on the 
chosen execution environment.</p>
-
-    <div class="highlight"><pre><code class="language-java" 
data-lang="java"><span class="n">env</span><span class="o">.</span><span 
class="na">execute</span><span class="o">(</span><span 
class="n">programName</span><span class="o">);</span></code></pre></div>
-
-  </div>
-
-  <div data-lang="scala">
-
-    <p>As presented in the <a href="#example-program">example</a> a Flink 
Streaming program looks almost identical to a regular Flink program. Each 
stream processing program consists of the following parts:</p>
-
-    <ol>
-      <li>Obtaining a <code>StreamExecutionEnvironment</code>,</li>
-      <li>Connecting to data stream sources,</li>
-      <li>Specifying transformations on the data streams,</li>
-      <li>Specifying output for the processed data,</li>
-      <li>Executing the program.</li>
-    </ol>
-
-    <p>As these steps are basically the same as in the batch API we will only 
note the important differences.
-For stream processing jobs, the user needs to obtain a 
<code>StreamExecutionEnvironment</code> in contrast with the <a 
href="programming_guide.html#program-skeleton">batch API</a> where one would 
need an <code>ExecutionEnvironment</code>. The process otherwise is essentially 
the same:</p>
-
-    <div class="highlight"><pre><code class="language-scala" 
data-lang="scala"><span class="nc">StreamExecutionEnvironment</span><span 
class="o">.</span><span class="n">getExecutionEnvironment</span>
-<span class="nc">StreamExecutionEnvironment</span><span 
class="o">.</span><span class="n">createLocalEnvironment</span><span 
class="o">(</span><span class="n">parallelism</span><span class="o">)</span>
-<span class="nc">StreamExecutionEnvironment</span><span 
class="o">.</span><span class="n">createRemoteEnvironment</span><span 
class="o">(</span><span class="n">host</span><span class="k">:</span> <span 
class="kt">String</span><span class="o">,</span> <span 
class="n">port</span><span class="k">:</span> <span 
class="kt">String</span><span class="o">,</span> <span 
class="n">parallelism</span><span class="k">:</span> <span 
class="kt">Int</span><span class="o">,</span> <span 
class="n">jarFiles</span><span class="k">:</span> <span 
class="kt">String*</span><span class="o">)</span></code></pre></div>
-
-    <p>For connecting to data streams the 
<code>StreamExecutionEnvironment</code> has many different methods, from basic 
file sources to completely general user defined data sources. We will go into 
details in the <a href="#basics">basics</a> section.</p>
-
-    <p>For example:</p>
-
-    <div class="highlight"><pre><code class="language-scala" 
data-lang="scala"><span class="n">env</span><span class="o">.</span><span 
class="n">socketTextStream</span><span class="o">(</span><span 
class="n">host</span><span class="o">,</span> <span class="n">port</span><span 
class="o">)</span>
-<span class="n">env</span><span class="o">.</span><span 
class="n">fromElements</span><span class="o">(</span><span 
class="n">elements</span><span class="err">…</span><span class="o">)</span>
-<span class="n">env</span><span class="o">.</span><span 
class="n">addSource</span><span class="o">(</span><span 
class="n">sourceFunction</span><span class="o">)</span></code></pre></div>
-
-    <p>After defining the data stream sources the user can specify 
transformations on the data streams to create a new data stream. Different data 
streams can be also combined together for joint transformations which are being 
showcased in the <a href="#transformations">transformations</a> section.</p>
-
-    <p>For example:</p>
-
-    <div class="highlight"><pre><code class="language-scala" 
data-lang="scala"><span class="n">dataStream</span><span 
class="o">.</span><span class="n">map</span><span class="o">(</span><span 
class="n">mapFunction</span><span class="o">).</span><span 
class="n">reduce</span><span class="o">(</span><span 
class="n">reduceFunction</span><span class="o">)</span></code></pre></div>
-
-    <p>The processed data can be pushed to different outputs called sinks. The 
user can define their own sinks or use any predefined filesystem, message queue 
or database sink.</p>
-
-    <p>For example:</p>
-
-    <div class="highlight"><pre><code class="language-scala" 
data-lang="scala"><span class="n">dataStream</span><span 
class="o">.</span><span class="n">writeAsCsv</span><span 
class="o">(</span><span class="n">path</span><span class="o">)</span>
-<span class="n">dataStream</span><span class="o">.</span><span 
class="n">print</span>
-<span class="n">dataStream</span><span class="o">.</span><span 
class="n">addSink</span><span class="o">(</span><span 
class="n">sinkFunction</span><span class="o">)</span></code></pre></div>
-
-    <p>Once the complete program is specified 
<code>execute(programName)</code> is to be called on the 
<code>StreamExecutionEnvironment</code>. This will either execute on the local 
machine or submit the program for execution on a cluster, depending on the 
chosen execution environment.</p>
-
-    <div class="highlight"><pre><code class="language-scala" 
data-lang="scala"><span class="n">env</span><span class="o">.</span><span 
class="n">execute</span><span class="o">(</span><span 
class="n">programName</span><span class="o">)</span></code></pre></div>
-
-  </div>
-
-</div>
-
-<p><a href="#top">Back to top</a></p>
-
-<h2 id="basics">Basics</h2>
-
-<h3 id="datastream">DataStream</h3>
-
-<p>The <code>DataStream</code> is the basic data abstraction provided by Flink 
Streaming. It represents a continuous, parallel, immutable stream of data of a 
certain type. By applying transformations the user can create new data streams 
or output the results of the computations. For instance the map transformation 
creates a new <code>DataStream</code> by applying a user defined function on 
each element of a given <code>DataStream</code></p>
-
-<p>The transformations may return different data stream types allowing more 
elaborate transformations, for example the <code>groupBy(…)</code> method 
returns a <code>GroupedDataStream</code> which can be used for grouped 
transformations such as aggregating by key. We will discover more elaborate 
data stream types in the upcoming sections.</p>
-
-<h3 id="object-reuse-behavior">Object Reuse Behavior</h3>
-
-<p>Apache Flink is trying to reduce the number of object allocations for 
better performance.</p>
-
-<p>By default, user defined functions (like <code>map()</code> or 
<code>reduce()</code>) are getting new objects on each call
-(or through an iterator). So it is possible to keep references to the objects 
inside the function
-(for example in a List).</p>
-
-<p>There is a switch at the <code>ExectionConfig</code> which allows users to 
enable the object reuse mode:</p>
-
-<p><code>
-env.getExecutionConfig().enableObjectReuse()
-</code></p>
-
-<p>For mutable types, Flink will reuse object
-instances. In practice that means that a <code>map()</code> function will 
always receive the same object
-instance (with its fields set to new values). The object reuse mode will lead 
to better performance
-because fewer objects are created, but the user has to manually take care of 
what they are doing
-with the object references.</p>
-
-<h3 id="partitioning">Partitioning</h3>
-
-<p>Partitioning controls how individual data points of a stream are 
distributed among the parallel instances of the transformation operators. This 
also controls the ordering of the records in the <code>DataStream</code>. There 
is partial ordering guarantee for the outputs with respect to the partitioning 
scheme (outputs produced from each partition are guaranteed to arrive in the 
order they were produced).</p>
-
-<p>There are several partitioning types supported in Flink Streaming:</p>
-
-<ul>
-  <li><em>Forward (default)</em>: Forward partitioning directs the output data 
to the next operator on the same machine (if possible) avoiding expensive 
network I/O. If there are more processing nodes than inputs or vice versa the 
load is distributed among the extra nodes in a round-robin fashion. This is the 
default partitioner.
-Usage: <code>dataStream.forward()</code></li>
-  <li><em>Shuffle</em>: Shuffle partitioning randomly partitions the output 
data stream to the next operator using uniform distribution. Use this only when 
it is important that the partitioning is randomised. If you only care about an 
even load use <em>Rebalance</em>.
-Usage: <code>dataStream.shuffle()</code></li>
-  <li><em>Rebalance</em>: Rebalance partitioning directs the output data 
stream to the next operator in a round-robin fashion, achieving a balanced 
distribution.
-Usage: <code>dataStream.rebalance()</code></li>
-  <li><em>Field/Key Partitioning</em>: Field/Key partitioning partitions the 
output data stream based on the hash code of a selected key of the tuples. Data 
points with the same key are directed to the same operator instance. 
-Usage: <code>dataStream.partitionByHash(fields…)</code></li>
-  <li><em>Field/Key Grouping</em>: Field/Key grouping takes partitioning one 
step further and seperates the elements to disjoint groups based on the hash 
code. These groups are processed separately by the next downstream operator. 
-Usage: <code>dataStream.groupBy(fields…)</code></li>
-  <li><em>Broadcast</em>: Broadcast partitioning sends the output data stream 
to all parallel instances of the next operator.
-Usage: <code>dataStream.broadcast()</code></li>
-  <li><em>Global</em>: All data points are directed to the first instance of 
the operator. 
-Usage: <code>dataStream.global()</code></li>
-</ul>
-
-<p>By default <em>Forward</em> partitioning is used.</p>
-
-<p>Partitioning does not remain in effect after a transformation, so it needs 
to be set again for subsequent operations.</p>
-
-<h3 id="connecting-to-the-outside-world">Connecting to the outside world</h3>
-
-<p>The user is expected to connect to the outside world through the source and 
the sink interfaces.</p>
-
-<h4 id="sources">Sources</h4>
-
-<p>Sources can by created by using 
<code>StreamExecutionEnvironment.addSource(sourceFunction)</code>. 
-Either use one of the source functions that come with Flink or write a custom 
source
-by implementing the <code>SourceFunction</code> interface. By default, sources 
run with
-parallelism of 1. To create parallel sources the user’s source function 
needs to implement
-<code>ParallelSourceFunction</code> or extend 
<code>RichParallelSourceFunction</code> in which cases the source will have
-the parallelism of the environment. The parallelism for 
ParallelSourceFunctions can be changed
-after creation by using <code>source.setParallelism(parallelism)</code>.</p>
-
-<p>The <code>SourceFunction</code> interface has two methods: 
<code>run(SourceContext)</code> and <code>cancel()</code>. The 
<code>run()</code>
-method is not expected to return until the source has either finished by 
itself or received
-a cancel request. The source can communicate with the outside world using the 
source context. For
-example, the <code>emit(element)</code> method is used to emit one element 
from the source. Most sources will
-have an infinite while loop inside the <code>run()</code> method to read from 
the input and emit elements.
-Upon invocation of the <code>cancel()</code> method the source is required to 
break out of its internal
-loop and return from the <code>run()</code> method. A common implementation 
for this is the following:</p>
-
-<div class="highlight"><pre><code class="language-java" data-lang="java"><span 
class="kd">public</span> <span class="kd">static</span> <span 
class="kd">class</span> <span class="nc">MySource</span> <span 
class="kd">implements</span> <span class="n">SourceFunction</span><span 
class="o">&lt;</span><span class="n">Long</span><span class="o">&gt;</span> 
<span class="o">{</span>
-
-    <span class="c1">// utility for job cancellation</span>
-    <span class="kd">private</span> <span class="kd">volatile</span> <span 
class="kt">boolean</span> <span class="n">isRunning</span> <span 
class="o">=</span> <span class="kc">false</span><span class="o">;</span>
-    
-    <span class="nd">@Override</span>
-    <span class="kd">public</span> <span class="kt">void</span> <span 
class="nf">run</span><span class="o">(</span><span 
class="n">SourceContext</span><span class="o">&lt;</span><span 
class="n">Long</span><span class="o">&gt;</span> <span 
class="n">ctx</span><span class="o">)</span> <span class="kd">throws</span> 
<span class="n">Exception</span> <span class="o">{</span>
-        <span class="n">isRunning</span> <span class="o">=</span> <span 
class="kc">true</span><span class="o">;</span>
-        <span class="k">while</span> <span class="o">(</span><span 
class="n">isRunning</span><span class="o">)</span> <span class="o">{</span>
-            <span class="c1">// the source runs, isRunning flag should be 
checked frequently</span>
-            <span class="o">}</span>
-        <span class="o">}</span>
-    <span class="o">}</span>
-
-    <span class="c1">// invoked by the framework in case of job 
cancellation</span>
-    <span class="nd">@Override</span>
-    <span class="kd">public</span> <span class="kt">void</span> <span 
class="nf">cancel</span><span class="o">()</span> <span class="o">{</span>
-        <span class="n">isRunning</span> <span class="o">=</span> <span 
class="kc">false</span><span class="o">;</span>
-    <span class="o">}</span>
-
-<span class="o">}</span></code></pre></div>
-
-<p>In addition to the bounded data sources (with similar method signatures as 
the
-<a href="programming_guide.html#data-sources">batch API</a>) there are several 
predefined stream sources
-accessible from the <code>StreamExecutionEnvironment</code>:</p>
-
-<ul>
-  <li>
-    <p><em>Socket text stream</em>: Creates a new <code>DataStream</code> that 
contains the strings received
-from the given socket. Strings are decoded by the system’s default character 
set. The user
-can optionally set the delimiters or the number of connection retries in case 
of errors.
-Usage: <code>env.socketTextStream(hostname, port,…)</code></p>
-  </li>
-  <li>
-    <p><em>Text file stream</em>: Creates a new <code>DataStream</code> that 
contains the lines of the files created
-(or modified) in a given directory. The system continuously monitors the given 
path, and processes
-any new files or modifications based on the settings. The file will be read 
with the system’s
-default character set.
-Usage: <code>env.readFileStream(String path, long checkFrequencyMillis, 
WatchType watchType)</code></p>
-  </li>
-  <li>
-    <p><em>Message queue connectors</em>: There are pre-implemented connectors 
for a number of popular message
-queue services, please refer to the section on <a 
href="#stream-connectors">connectors</a> for more details.</p>
-  </li>
-  <li>
-    <p><em>Custom source</em>: Creates a new <code>DataStream</code> by using 
a user defined <code>SourceFunction</code> implementation.
-Usage: <code>env.addSource(sourceFunction)</code></p>
-  </li>
-</ul>
-
-<h4 id="sinks">Sinks</h4>
-
-<p><code>DataStreamSink</code> represents the different outputs of Flink 
Streaming programs. The user can either define his own 
<code>SinkFunction</code> implementation or chose one of the available 
implementations (methods of <code>DataStream</code>).</p>
-
-<p>For example:</p>
-
-<ul>
-  <li><code>dataStream.print()</code> – Writes the <code>DataStream</code> 
to the standard output, practical for testing purposes</li>
-  <li><code>dataStream.writeAsText(parameters)</code> – Writes the 
<code>DataStream</code> to a text file</li>
-  <li><code>dataStream.writeAsCsv(parameters)</code> – Writes the 
<code>DataStream</code> to CSV format</li>
-  <li><code>dataStream.addSink(sinkFunction)</code> – Custom sink 
implementation</li>
-</ul>
-
-<p>There are pre-implemented connectors for a number of the most popular 
message queue services, please refer to the section on <a 
href="#stream-connectors">connectors</a> for more detail.</p>
-
-<p><a href="#top">Back to top</a></p>
-
-<h2 id="transformations">Transformations</h2>
-
-<p>Transformations, also called operators, represent the users’ business 
logic on the data stream. Operators consume data streams and produce new data 
streams. The user can chain and combine multiple operators on the data stream 
to produce the desired processing steps. Most of the operators work very 
similar to the batch Flink API allowing developers to reason about 
<code>DataStream</code> the same way as they would about <code>DataSet</code>. 
At the same time there are operators that exploit the streaming nature of the 
data to allow advanced functionality.</p>
-
-<h3 id="basic-transformations">Basic transformations</h3>
-
-<p>Basic transformations can be seen as functions that operate on records of 
the data stream.</p>
-
-<div class="codetabs">
-  <div data-lang="java">
-
-    <p><br /></p>
-
-    <table class="table table-bordered">
-  <thead>
-    <tr>
-      <th class="text-left" style="width: 20%">Transformation</th>
-      <th class="text-center">Description</th>
-    </tr>
-  </thead>
-
-  <tbody>
-    <tr>
-      <td><strong>Map</strong></td>
-      <td>
-        <p>Takes one element and produces one element. A map that doubles the 
values of the input stream:</p>
-
-<div class="highlight"><pre><code class="language-java" data-lang="java"><span 
class="n">dataStream</span><span class="o">.</span><span 
class="na">map</span><span class="o">(</span><span class="k">new</span> <span 
class="n">MapFunction</span><span class="o">&lt;</span><span 
class="n">Integer</span><span class="o">,</span> <span 
class="n">Integer</span><span class="o">&gt;()</span> <span class="o">{</span>
-            <span class="nd">@Override</span>
-            <span class="kd">public</span> <span class="n">Integer</span> 
<span class="nf">map</span><span class="o">(</span><span 
class="n">Integer</span> <span class="n">value</span><span class="o">)</span> 
<span class="kd">throws</span> <span class="n">Exception</span> <span 
class="o">{</span>
-                <span class="k">return</span> <span class="mi">2</span> <span 
class="o">*</span> <span class="n">value</span><span class="o">;</span>
-            <span class="o">}</span>
-        <span class="o">});</span></code></pre></div>
-
-      </td>
-    </tr>
-
-    <tr>
-      <td><strong>FlatMap</strong></td>
-      <td>
-        <p>Takes one element and produces zero, one, or more elements. A 
flatmap that splits sentences to words:</p>
-
-<div class="highlight"><pre><code class="language-java" data-lang="java"><span 
class="n">dataStream</span><span class="o">.</span><span 
class="na">flatMap</span><span class="o">(</span><span class="k">new</span> 
<span class="n">FlatMapFunction</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">String</span><span class="o">&gt;()</span> <span class="o">{</span>
-            <span class="nd">@Override</span>
-            <span class="kd">public</span> <span class="kt">void</span> <span 
class="nf">flatMap</span><span class="o">(</span><span class="n">String</span> 
<span class="n">value</span><span class="o">,</span> <span 
class="n">Collector</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">&gt;</span> <span 
class="n">out</span><span class="o">)</span> 
-                <span class="kd">throws</span> <span 
class="n">Exception</span> <span class="o">{</span>
-                <span class="k">for</span><span class="o">(</span><span 
class="n">String</span> <span class="nl">word:</span> <span 
class="n">value</span><span class="o">.</span><span 
class="na">split</span><span class="o">(</span><span class="s">&quot; 
&quot;</span><span class="o">)){</span>
-                    <span class="n">out</span><span class="o">.</span><span 
class="na">collect</span><span class="o">(</span><span 
class="n">word</span><span class="o">);</span>
-                <span class="o">}</span>
-            <span class="o">}</span>
-        <span class="o">});</span></code></pre></div>
-
-      </td>
-    </tr>
-
-    <tr>
-      <td><strong>Filter</strong></td>
-      <td>
-        <p>Evaluates a boolean function for each element and retains those for 
which the function returns true.
-       <br />
-       <br />
-        A filter that filters out zero values:
-        </p>
-
-<div class="highlight"><pre><code class="language-java" data-lang="java"><span 
class="n">dataStream</span><span class="o">.</span><span 
class="na">filter</span><span class="o">(</span><span class="k">new</span> 
<span class="n">FilterFunction</span><span class="o">&lt;</span><span 
class="n">Integer</span><span class="o">&gt;()</span> <span class="o">{</span> 
-            <span class="nd">@Override</span>
-            <span class="kd">public</span> <span class="kt">boolean</span> 
<span class="nf">filter</span><span class="o">(</span><span 
class="n">Integer</span> <span class="n">value</span><span class="o">)</span> 
<span class="kd">throws</span> <span class="n">Exception</span> <span 
class="o">{</span>
-                <span class="k">return</span> <span class="n">value</span> 
<span class="o">!=</span> <span class="mi">0</span><span class="o">;</span>
-            <span class="o">}</span>
-        <span class="o">});</span></code></pre></div>
-
-      </td>
-    </tr>
-
-    <tr>
-      <td><strong>Reduce</strong></td>
-      <td>
-        <p>Combines a stream of elements into another stream by repeatedly 
combining two elements
-        into one and emits the current state after every reduction. Reduce may 
be applied on a full, windowed or grouped data stream.
-        <br />
-        
-        <strong>IMPORTANT:</strong> The streaming and the batch reduce 
functions have different semantics. A streaming reduce on a data stream emits 
the current reduced value for every new element on a data stream. On a windowed 
data stream it works as a batch reduce: it produces at most one value per 
window.
-        <br />
-       <br />
-         A reducer that sums up the incoming stream, the result is a stream of 
intermediate sums:</p>
-
-<div class="highlight"><pre><code class="language-java" data-lang="java"><span 
class="n">dataStream</span><span class="o">.</span><span 
class="na">reduce</span><span class="o">(</span><span class="k">new</span> 
<span class="n">ReduceFunction</span><span class="o">&lt;</span><span 
class="n">Integer</span><span class="o">&gt;()</span> <span class="o">{</span>
-            <span class="nd">@Override</span>
-            <span class="kd">public</span> <span class="n">Integer</span> 
<span class="nf">reduce</span><span class="o">(</span><span 
class="n">Integer</span> <span class="n">value1</span><span class="o">,</span> 
<span class="n">Integer</span> <span class="n">value2</span><span 
class="o">)</span> 
-            <span class="kd">throws</span> <span class="n">Exception</span> 
<span class="o">{</span>
-                <span class="k">return</span> <span class="n">value1</span> 
<span class="o">+</span> <span class="n">value2</span><span class="o">;</span>
-            <span class="o">}</span>
-        <span class="o">});</span></code></pre></div>
-
-      </td>
-    </tr>
-
-    <tr>
-      <td><strong>Fold</strong></td>
-      <td>
-        <p>Combines a stream element by element with an initial aggregator 
value. Fold may be applied on a full, windowed or grouped data stream.
-        <br />
-         A folder that appends strings one by one to the empty sting:</p>
-
-<div class="highlight"><pre><code class="language-java" data-lang="java"><span 
class="n">dataStream</span><span class="o">.</span><span 
class="na">fold</span><span class="o">(</span><span 
class="s">&quot;&quot;</span><span class="o">,</span> <span 
class="k">new</span> <span class="n">FoldFunction</span><span 
class="o">&lt;</span><span class="n">String</span><span class="o">,</span> 
<span class="n">String</span><span class="o">&gt;()</span> <span 
class="o">{</span>
-            <span class="nd">@Override</span>
-            <span class="kd">public</span> <span class="n">String</span> <span 
class="nf">fold</span><span class="o">(</span><span class="n">String</span> 
<span class="n">accumulator</span><span class="o">,</span> <span 
class="n">String</span> <span class="n">value</span><span class="o">)</span> 
<span class="kd">throws</span> <span class="n">Exception</span> <span 
class="o">{</span>
-                <span class="k">return</span> <span 
class="n">accumulator</span> <span class="o">+</span> <span 
class="n">value</span><span class="o">;</span>
-            <span class="o">}</span>
-       <span class="o">});</span></code></pre></div>
-
-      </td>
-    </tr>
-
-    <tr>
-      <td><strong>Union</strong></td>
-      <td>
-        <p>Union of two or more data streams creating a new stream containing 
all the elements from all the streams.</p>
-
-<div class="highlight"><pre><code class="language-java" data-lang="java"><span 
class="n">dataStream</span><span class="o">.</span><span 
class="na">union</span><span class="o">(</span><span 
class="n">otherStream1</span><span class="o">,</span> <span 
class="n">otherStream2</span><span class="o">,</span> <span 
class="err">…</span><span class="o">)</span></code></pre></div>
-
-      </td>
-    </tr>
-  </tbody>
-</table>
-
-    <hr />
-
-    <p>The following transformations are available on data streams of 
Tuples:</p>
-
-    <table class="table table-bordered">
-  <thead>
-    <tr>
-      <th class="text-left" style="width: 20%">Transformation</th>
-      <th class="text-center">Description</th>
-    </tr>
-  </thead>
-  <tbody>
-   <tr>
-      <td><strong>Project</strong></td>
-      <td>
-        <p>Selects a subset of fields from the tuples</p>
-
-<div class="highlight"><pre><code class="language-java" data-lang="java"><span 
class="n">DataStream</span><span class="o">&lt;</span><span 
class="n">Tuple3</span><span class="o">&lt;</span><span 
class="n">Integer</span><span class="o">,</span> <span 
class="n">Double</span><span class="o">,</span> <span 
class="n">String</span><span class="o">&gt;&gt;</span> <span 
class="n">in</span> <span class="o">=</span> <span class="c1">// [...]</span>
-<span class="n">DataStream</span><span class="o">&lt;</span><span 
class="n">Tuple2</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">Integer</span><span class="o">&gt;&gt;</span> <span 
class="n">out</span> <span class="o">=</span> <span class="n">in</span><span 
class="o">.</span><span class="na">project</span><span class="o">(</span><span 
class="mi">2</span><span class="o">,</span><span class="mi">0</span><span 
class="o">);</span></code></pre></div>
-
-      </td>
-    </tr>
-  </tbody>
-</table>
-
-  </div>
-
-  <div data-lang="scala">
-
-    <p><br /></p>
-
-    <table class="table table-bordered">
-  <thead>
-    <tr>
-      <th class="text-left" style="width: 20%">Transformation</th>
-      <th class="text-center">Description</th>
-    </tr>
-  </thead>
-
-  <tbody>
-
-    <tr>
-      <td><strong>Map</strong></td>
-      <td>
-        <p>Takes one element and produces one element. A map that doubles the 
values of the input stream:</p>
-
-<div class="highlight"><pre><code class="language-scala" 
data-lang="scala"><span class="n">dataStream</span><span 
class="o">.</span><span class="n">map</span><span class="o">{</span> <span 
class="n">x</span> <span class="k">=&gt;</span> <span class="n">x</span> <span 
class="o">*</span> <span class="mi">2</span> <span 
class="o">}</span></code></pre></div>
-
-      </td>
-    </tr>
-
-    <tr>
-      <td><strong>FlatMap</strong></td>
-      <td>
-        <p>Takes one element and produces zero, one, or more elements. A 
flatmap that splits sentences to words:</p>
-
-<div class="highlight"><pre><code class="language-scala" 
data-lang="scala"><span class="n">data</span><span class="o">.</span><span 
class="n">flatMap</span> <span class="o">{</span> <span class="n">str</span> 
<span class="k">=&gt;</span> <span class="n">str</span><span 
class="o">.</span><span class="n">split</span><span class="o">(</span><span 
class="s">&quot; &quot;</span><span class="o">)</span> <span 
class="o">}</span></code></pre></div>
-
-      </td>
-    </tr>
-
-    <tr>
-      <td><strong>Filter</strong></td>
-      <td>
-        <p>Evaluates a boolean function for each element and retains those for 
which the function returns true.
-               <br />
-       <br />
-        A filter that filters out zero values:
-        </p>
-
-<div class="highlight"><pre><code class="language-scala" 
data-lang="scala"><span class="n">dataStream</span><span 
class="o">.</span><span class="n">filter</span><span class="o">{</span> <span 
class="k">_</span> <span class="o">!=</span> <span class="mi">0</span> <span 
class="o">}</span></code></pre></div>
-
-      </td>
-    </tr>
-
-    <tr>
-      <td><strong>Reduce</strong></td>
-      <td>
-        <p>Combines a stream of elements into another stream by repeatedly 
combining two elements
-        into one and emits the current state after every reduction. Reduce may 
be applied on a full, windowed or grouped data stream.
-        <br />
-        
-        <strong>IMPORTANT:</strong> The streaming and the batch reduce 
functions have different semantics. A streaming reduce on a data stream emits 
the current reduced value for every new element on a data stream. On a windowed 
data stream it works as a batch reduce: it produces at most one value per 
window.
-        <br />
-       <br />
-         A reducer that sums up the incoming stream, the result is a stream of 
intermediate sums:</p>
-
-<div class="highlight"><pre><code class="language-scala" 
data-lang="scala"><span class="n">dataStream</span><span 
class="o">.</span><span class="n">reduce</span><span class="o">{</span> <span 
class="k">_</span> <span class="o">+</span> <span class="k">_</span><span 
class="o">}</span></code></pre></div>
-
-      </td>
-    </tr>
-
-    <tr>
-      <td><strong>Fold</strong></td>
-        <td>
-        <p>Combines a stream element by element with an initial aggregator 
value. Fold may be applied on a full, windowed or grouped data stream.
-        <br />
-         A folder that appends strings one by one to the empty sting:</p>
-
-<div class="highlight"><pre><code class="language-scala" 
data-lang="scala"><span class="n">dataStream</span><span 
class="o">.</span><span class="n">fold</span><span class="o">{</span><span 
class="s">&quot;&quot;</span><span class="o">,</span> <span class="k">_</span> 
<span class="o">+</span> <span class="k">_</span> <span 
class="o">}</span></code></pre></div>
-
-      </td>
-    </tr>
-
-    <tr>
-      <td><strong>Union</strong></td>
-      <td>
-        <p>Union of two or more data streams creating a new stream containing 
all the elements from all the streams.</p>
-
-<div class="highlight"><pre><code class="language-scala" 
data-lang="scala"><span class="n">dataStream</span><span 
class="o">.</span><span class="n">union</span><span class="o">(</span><span 
class="n">otherStream1</span><span class="o">,</span> <span 
class="n">otherStream2</span><span class="o">,</span> <span 
class="err">…</span><span class="o">)</span></code></pre></div>
-
-      </td>
-    </tr>
-
-  </tbody>
-
-
-</table>
-
-  </div>
-
-</div>
-
-<h3 id="grouped-operators">Grouped operators</h3>
-
-<p>Some transformations require that the elements of a <code>DataStream</code> 
are grouped on some key. The user can create a <code>GroupedDataStream</code> 
by calling the <code>groupBy(key)</code> method of a non-grouped 
<code>DataStream</code>. 
-Keys can be of three types: field positions (applicable for tuple/array 
types), field expressions (applicable for pojo types), KeySelector 
instances.</p>
-
-<p>Aggregation or reduce operators called on <code>GroupedDataStream</code>s 
produce elements on a per group basis.</p>
-
-<h3 id="aggregations">Aggregations</h3>
-
-<p>The Flink Streaming API supports different types of pre-defined 
aggregations of <code>DataStreams</code>. A common property of these operators, 
is that they produce the stream of intermediate aggregate values (just like 
reduce on streams).</p>
-
-<p>Types of aggregations: <code>sum(field)</code>, <code>min(field)</code>, 
<code>max(field)</code>, <code>minBy(field, first)</code>, <code>maxBy(field, 
first)</code>.</p>
-
-<p>With <code>sum</code>, <code>min</code>, and <code>max</code> for every 
incoming tuple the selected field is replaced with the current aggregated 
value. Fields can be selected using either field positions or field expressions 
(similarly to grouping).</p>
-
-<p>With <code>minBy</code> and <code>maxBy</code> the output of the operator 
is the element with the current minimal or maximal value at the given field. If 
more components share the minimum or maximum value, the user can decide if the 
operator should return the first or last element. This can be set by the 
<code>first</code> boolean parameter.</p>
-
-<h3 id="window-operators">Window operators</h3>
-
-<p>Flink streaming provides very flexible data-driven windowing semantics to 
create arbitrary windows (also referred to as discretizations or slices) of the 
data streams and apply reduce, map or aggregation transformations on the 
windows acquired. Windowing can be used for instance to create rolling 
aggregations of the most recent N elements, where N could be defined by Time, 
Count or any arbitrary user defined measure.</p>
-
-<p>The user can control the size (eviction) of the windows and the frequency 
of transformation or aggregation calls (trigger) on them in an intuitive API. 
We will describe the exact semantics of these operators in the <a 
href="#policy-based-windowing">policy based windowing</a> section.</p>
-
-<p>Some examples:</p>
-
-<ul>
-  
<li><code>dataStream.window(eviction).every(trigger).reduceWindow(…)</code></li>
-  
<li><code>dataStream.window(…).every(…).mapWindow(…).flatten()</code></li>
-  
<li><code>dataStream.window(…).every(…).groupBy(…).aggregate(…).getDiscretizedStream()</code></li>
-</ul>
-
-<p>The core abstraction of the Windowing semantics is the 
<code>WindowedDataStream</code> and the <code>StreamWindow</code>. The 
<code>WindowedDataStream</code> is created when we first call the 
<code>window(…)</code> method of the DataStream and represents the windowed 
discretisation of the underlying stream. The user can think about it simply as 
a <code>DataStream&lt;StreamWindow&lt;T&gt;&gt;</code> where additional API 
functions are supplied to provide efficient transformations of individual 
windows.</p>
-
-<p>Please note at this point that the <code>.every(…)</code> call belongs 
together with the preceding <code>.window(…)</code> call and does not define 
a new transformation in itself.</p>
-
-<p>The result of a window transformation is again a 
<code>WindowedDataStream</code> which can also be used to further apply other 
windowed computations. In this sense, window transformations define mapping 
from stream windows to stream windows.</p>
-
-<p>The user has different ways of using the result of a window operation:</p>
-
-<ul>
-  <li><code>windowedDataStream.flatten()</code> - streams the results element 
wise and returns a <code>DataStream&lt;T&gt;</code> where T is the type of the 
underlying windowed stream</li>
-  <li><code>windowedDataStream.getDiscretizedStream()</code> - returns a 
<code>DataStream&lt;StreamWindow&lt;T&gt;&gt;</code> for applying some advanced 
logic on the stream windows itself. Be careful here, as at this point, we need 
to materialise the full windows</li>
-  <li>Calling any window transformation further transforms the windows, while 
preserving the windowing logic</li>
-</ul>
-
-<p>The next example would create windows that hold elements of the last 5 
seconds, and the user defined transformation would be executed on the windows 
every second (sliding the window by 1 second):</p>
-
-<div class="codetabs">
-  <div data-lang="java">
-
-    <div class="highlight"><pre><code class="language-java" 
data-lang="java"><span class="n">dataStream</span><span class="o">.</span><span 
class="na">window</span><span class="o">(</span><span 
class="n">Time</span><span class="o">.</span><span class="na">of</span><span 
class="o">(</span><span class="mi">5</span><span class="o">,</span> <span 
class="n">TimeUnit</span><span class="o">.</span><span 
class="na">SECONDS</span><span class="o">)).</span><span 
class="na">every</span><span class="o">(</span><span class="n">Time</span><span 
class="o">.</span><span class="na">of</span><span class="o">(</span><span 
class="mi">1</span><span class="o">,</span> <span 
class="n">TimeUnit</span><span class="o">.</span><span 
class="na">SECONDS</span><span class="o">));</span></code></pre></div>
-
-  </div>
-  <div data-lang="scala">
-
-    <div class="highlight"><pre><code class="language-scala" 
data-lang="scala"><span class="n">dataStream</span><span 
class="o">.</span><span class="n">window</span><span class="o">(</span><span 
class="nc">Time</span><span class="o">.</span><span class="n">of</span><span 
class="o">(</span><span class="mi">5</span><span class="o">,</span> <span 
class="nc">TimeUnit</span><span class="o">.</span><span 
class="nc">SECONDS</span><span class="o">)).</span><span 
class="n">every</span><span class="o">(</span><span class="nc">Time</span><span 
class="o">.</span><span class="n">of</span><span class="o">(</span><span 
class="mi">1</span><span class="o">,</span> <span 
class="nc">TimeUnit</span><span class="o">.</span><span 
class="nc">SECONDS</span><span class="o">))</span></code></pre></div>
-
-  </div>
-</div>
-
-<p>This approach is often referred to as policy based windowing. Different 
policies (count, time, etc.) can be mixed as well, for example to downsample 
our stream, a window that takes the latest 100 elements of the stream every 
minute is created as follows:</p>
-
-<div class="codetabs">
-  <div data-lang="java">
-
-    <div class="highlight"><pre><code class="language-java" 
data-lang="java"><span class="n">dataStream</span><span class="o">.</span><span 
class="na">window</span><span class="o">(</span><span 
class="n">Count</span><span class="o">.</span><span class="na">of</span><span 
class="o">(</span><span class="mi">100</span><span class="o">)).</span><span 
class="na">every</span><span class="o">(</span><span class="n">Time</span><span 
class="o">.</span><span class="na">of</span><span class="o">(</span><span 
class="mi">1</span><span class="o">,</span> <span 
class="n">TimeUnit</span><span class="o">.</span><span 
class="na">MINUTES</span><span class="o">));</span></code></pre></div>
-
-  </div>
-  <div data-lang="scala">
-
-    <div class="highlight"><pre><code class="language-scala" 
data-lang="scala"><span class="n">dataStream</span><span 
class="o">.</span><span class="n">window</span><span class="o">(</span><span 
class="nc">Count</span><span class="o">.</span><span class="n">of</span><span 
class="o">(</span><span class="mi">100</span><span class="o">)).</span><span 
class="n">every</span><span class="o">(</span><span class="nc">Time</span><span 
class="o">.</span><span class="n">of</span><span class="o">(</span><span 
class="mi">1</span><span class="o">,</span> <span 
class="nc">TimeUnit</span><span class="o">.</span><span 
class="nc">MINUTES</span><span class="o">))</span></code></pre></div>
-
-  </div>
-</div>
-
-<p>The user can also omit the <code>every(…)</code> call which results in a 
tumbling window emptying the window after every transformation call.</p>
-
-<p>Several predefined policies are provided in the API, including delta-based, 
count-based and time-based policies. These can be accessed through the static 
methods provided by the <code>PolicyHelper</code> classes:</p>
-
-<ul>
-  <li><code>Time.of(…)</code></li>
-  <li><code>Count.of(…)</code></li>
-  <li><code>Delta.of(…)</code></li>
-  <li><code>FullStream.window()</code></li>
-</ul>
-
-<p>For detailed description of these policies please refer to the <a 
href="http://flink.apache.org/docs/latest/api/java/";>Javadocs</a>.</p>
-
-<h4 id="policy-based-windowing">Policy based windowing</h4>
-<p>The policy based windowing is a highly flexible way to specify stream 
discretisation also called windowing semantics. Two types of policies are used 
for such a specification:</p>
-
-<ul>
-  <li>
-    <p><code>TriggerPolicy</code> defines when to trigger the reduce or 
transformation UDF on the current window and emit the result. In the API it 
completes a window statement such as: <code>window(…).every(…)</code>, 
while the triggering policy is passed within <code>every</code>. In case the 
user wants to use tumbling eviction policy (the window is emptied after the 
transformation) he can omit the <code>.every(…)</code> call and pass the 
trigger policy directly to the <code>.window(…)</code> call.</p>
-  </li>
-  <li>
-    <p><code>EvictionPolicy</code> defines the length of a window as a means 
of a predicate for evicting tuples when they are no longer needed. In the API 
this can be defined by the <code>window(…)</code> operation on a stream. 
There are mostly the same predefined policy types provided as for trigger 
policies.</p>
-  </li>
-</ul>
-
-<p>Trigger and eviction policies work totally independently of each other. The 
eviction policy continuously maintains a window, into which it adds new 
elements and based on the eviction logic removes older elements in the order of 
arrival. The trigger policy on the other hand only decided at each new incoming 
element, whether it should trigger computation (and output results) on the 
currently maintained window.</p>
-
-<p>Several predefined policies are provided in the API, including delta-based, 
punctuation based, count-based and time-based policies. Policies are in general 
UDFs and can implement any custom behaviour.</p>
-
-<p>In addition to the <code>dataStream.window(…).every(…)</code> style, 
users can specifically pass the trigger and eviction policies during the window 
call:</p>
-
-<div class="codetabs">
-  <div data-lang="java">
-
-    <div class="highlight"><pre><code class="language-java" 
data-lang="java"><span class="n">dataStream</span><span class="o">.</span><span 
class="na">window</span><span class="o">(</span><span 
class="n">triggerPolicy</span><span class="o">,</span> <span 
class="n">evictionPolicy</span><span class="o">);</span></code></pre></div>
-
-  </div>
-
-  <div data-lang="scala">
-
-    <div class="highlight"><pre><code class="language-scala" 
data-lang="scala"><span class="n">dataStream</span><span 
class="o">.</span><span class="n">window</span><span class="o">(</span><span 
class="n">triggerPolicy</span><span class="o">,</span> <span 
class="n">evictionPolicy</span><span class="o">)</span></code></pre></div>
-
-  </div>
-
-</div>
-
-<p>By default triggers can only trigger when a new element arrives. This might 
not be suitable for all the use-cases with low data rates. To also provide 
triggering between elements, so called active policies can be used (the two 
interfaces controlling this special behaviour is 
<code>ActiveTriggerPolicy</code> and <code>CentralActiveTrigger</code>). The 
predefined time-based policies are already implemented in such a way and can 
hold as an example for user defined active policy implementations.</p>
-
-<p>Time-based trigger and eviction policies can work with user defined 
<code>TimeStamp</code> implementations, these policies already cover most use 
cases.</p>
-
-<h4 id="reduce-on-windowed-data-streams">Reduce on windowed data streams</h4>
-<p>The 
<code>WindowedDataStream&lt;T&gt;.reduceWindow(ReduceFunction&lt;T&gt;)</code> 
transformation calls the user-defined <code>ReduceFunction</code> at every 
trigger on the records currently in the window. The user can also use the 
different pre-implemented streaming aggregations such as <code>sum, min, max, 
minBy</code> and <code>maxBy</code>.</p>
-
-<p>The following is an example for a window reduce that sums the elements in 
the last minute with 10 seconds slide interval:</p>
-
-<div class="codetabs">
-  <div data-lang="java">
-
-    <div class="highlight"><pre><code class="language-java" 
data-lang="java"><span class="n">dataStream</span><span class="o">.</span><span 
class="na">window</span><span class="o">(</span><span 
class="n">Time</span><span class="o">.</span><span class="na">of</span><span 
class="o">(</span><span class="mi">1</span><span class="o">,</span> <span 
class="n">TimeUnit</span><span class="o">.</span><span 
class="na">MINUTES</span><span class="o">)).</span><span 
class="na">every</span><span class="o">(</span><span class="n">Time</span><span 
class="o">.</span><span class="na">of</span><span class="o">(</span><span 
class="mi">10</span><span class="o">,</span><span 
class="n">TimeUnit</span><span class="o">.</span><span 
class="na">SECONDS</span><span class="o">)).</span><span 
class="na">sum</span><span class="o">(</span><span class="n">field</span><span 
class="o">);</span></code></pre></div>
-
-  </div>
-
-  <div data-lang="scala">
-
-    <div class="highlight"><pre><code class="language-scala" 
data-lang="scala"><span class="n">dataStream</span><span 
class="o">.</span><span class="n">window</span><span class="o">(</span><span 
class="nc">Time</span><span class="o">.</span><span class="n">of</span><span 
class="o">(</span><span class="mi">1</span><span class="o">,</span> <span 
class="nc">TimeUnit</span><span class="o">.</span><span 
class="nc">MINUTES</span><span class="o">)).</span><span 
class="n">every</span><span class="o">(</span><span class="nc">Time</span><span 
class="o">.</span><span class="n">of</span><span class="o">(</span><span 
class="mi">10</span><span class="o">,</span><span 
class="nc">TimeUnit</span><span class="o">.</span><span 
class="nc">SECONDS</span><span class="o">)).</span><span 
class="n">sum</span><span class="o">(</span><span class="n">field</span><span 
class="o">)</span></code></pre></div>
-
-  </div>
-
-</div>
-
-<h4 id="map-on-windowed-data-streams">Map on windowed data streams</h4>
-<p>The 
<code>WindowedDataStream&lt;T&gt;.mapWindow(WindowMapFunction&lt;T,O&gt;)</code>
 transformation calls  <code>mapWindow(…)</code> for each 
<code>StreamWindow</code> in the discretised stream, providing access to all 
elements in the window through the iterable interface. At each function call 
the output <code>StreamWindow&lt;O&gt;</code> will consist of all the elements 
collected to the collector. This allows a straightforward way of mapping one 
stream window to another.</p>
-
-<div class="codetabs">
-  <div data-lang="java">
-
-    <div class="highlight"><pre><code class="language-java" 
data-lang="java"><span class="n">windowedDataStream</span><span 
class="o">.</span><span class="na">mapWindow</span><span 
class="o">(</span><span class="n">windowMapFunction</span><span 
class="o">);</span></code></pre></div>
-
-  </div>
-
-  <div data-lang="scala">
-
-    <div class="highlight"><pre><code class="language-scala" 
data-lang="scala"><span class="n">windowedDataStream</span><span 
class="o">.</span><span class="n">mapWindow</span><span class="o">(</span><span 
class="n">windowMapFunction</span><span class="o">)</span></code></pre></div>
-
-  </div>
-
-</div>
-
-<h4 id="grouped-transformations-on-windowed-data-streams">Grouped 
transformations on windowed data streams</h4>
-<p>Calling the <code>groupBy(…)</code> method on a windowed stream groups 
the elements by the given fields inside the stream windows. The window sizes 
(evictions) and slide sizes (triggers) will be calculated on the whole stream 
(in a global fashion), but the user defined functions will be applied on a per 
group basis inside the window. This means that for a call 
<code>windowedStream.groupBy(…).reduceWindow(…)</code> will transform each 
window into another window consisting of as many elements as keys in the 
original window, with the reduced values per key. Similarly the 
<code>mapWindow</code> transformation is applied per group as well.</p>
-
-<p>The user can also create discretisation on a per group basis calling 
<code>window(…).every(…)</code> on an already grouped data stream. This 
will apply the discretisation logic independently for each key.</p>
-
-<p>To highlight the differences let us look at two examples.</p>
-
-<p>To get the maximal value for each key on the last 100 elements (global) we 
use the first approach:</p>
-
-<div class="codetabs">
-  <div data-lang="java">
-
-    <div class="highlight"><pre><code class="language-java" 
data-lang="java"><span class="n">dataStream</span><span class="o">.</span><span 
class="na">window</span><span class="o">(</span><span 
class="n">Count</span><span class="o">.</span><span class="na">of</span><span 
class="o">(</span><span class="mi">100</span><span class="o">)).</span><span 
class="na">every</span><span class="o">(</span><span 
class="err">…</span><span class="o">).</span><span 
class="na">groupBy</span><span class="o">(</span><span 
class="n">groupingField</span><span class="o">).</span><span 
class="na">max</span><span class="o">(</span><span class="n">field</span><span 
class="o">);</span></code></pre></div>
-
-  </div>
-
-  <div data-lang="scala">
-
-    <div class="highlight"><pre><code class="language-scala" 
data-lang="scala"><span class="n">dataStream</span><span 
class="o">.</span><span class="n">window</span><span class="o">(</span><span 
class="nc">Count</span><span class="o">.</span><span class="n">of</span><span 
class="o">(</span><span class="mi">100</span><span class="o">)).</span><span 
class="n">every</span><span class="o">(</span><span class="err">…</span><span 
class="o">).</span><span class="n">groupBy</span><span class="o">(</span><span 
class="n">groupingField</span><span class="o">).</span><span 
class="n">max</span><span class="o">(</span><span class="n">field</span><span 
class="o">)</span></code></pre></div>
-
-  </div>
-
-</div>
-
-<p>Using this approach we took the last 100 elements, divided it into groups 
by key, and then applied the aggregation. To create fixed size windows for 
every key, we need to bring the groupBy call before the window call. So to take 
the max for the last 100 elements in each group:</p>
-
-<div class="codetabs">
-  <div data-lang="java">
-
-    <div class="highlight"><pre><code class="language-java" 
data-lang="java"><span class="n">dataStream</span><span class="o">.</span><span 
class="na">groupBy</span><span class="o">(</span><span 
class="n">groupingField</span><span class="o">).</span><span 
class="na">window</span><span class="o">(</span><span 
class="n">Count</span><span class="o">.</span><span class="na">of</span><span 
class="o">(</span><span class="mi">100</span><span class="o">)).</span><span 
class="na">every</span><span class="o">(</span><span 
class="err">…</span><span class="o">).</span><span class="na">max</span><span 
class="o">(</span><span class="n">field</span><span 
class="o">);</span></code></pre></div>
-
-  </div>
-
-  <div data-lang="scala">
-
-    <div class="highlight"><pre><code class="language-scala" 
data-lang="scala"><span class="n">dataStream</span><span 
class="o">.</span><span class="n">groupBy</span><span class="o">(</span><span 
class="n">groupingField</span><span class="o">).</span><span 
class="n">window</span><span class="o">(</span><span 
class="nc">Count</span><span class="o">.</span><span class="n">of</span><span 
class="o">(</span><span class="mi">100</span><span class="o">)).</span><span 
class="n">every</span><span class="o">(</span><span class="err">…</span><span 
class="o">).</span><span class="n">max</span><span class="o">(</span><span 
class="n">field</span><span class="o">)</span></code></pre></div>
-
-  </div>
-
-</div>
-
-<p>This will create separate windows for different keys and apply the trigger 
and eviction policies on a per group basis.</p>
-
-<h4 id="applying-multiple-transformations-on-a-window">Applying multiple 
transformations on a window</h4>
-<p>Using the <code>WindowedDataStream</code> abstraction we can apply several 
transformations one after another on the discretised streams without having to 
re-discretise it:</p>
-
-<div class="codetabs">
-  <div data-lang="java">
-
-    <div class="highlight"><pre><code class="language-java" 
data-lang="java"><span class="n">dataStream</span><span class="o">.</span><span 
class="na">window</span><span class="o">(</span><span 
class="n">Count</span><span class="o">.</span><span class="na">of</span><span 
class="o">(</span><span class="mi">1000</span><span class="o">)).</span><span 
class="na">groupBy</span><span class="o">(</span><span 
class="n">firstKey</span><span class="o">).</span><span 
class="na">mapWindow</span><span class="o">(</span><span 
class="err">…</span><span class="o">)</span>
-    <span class="o">.</span><span class="na">groupBy</span><span 
class="o">(</span><span class="n">secondKey</span><span 
class="o">).</span><span class="na">reduceWindow</span><span 
class="o">(</span><span class="err">…</span><span class="o">).</span><span 
class="na">flatten</span><span class="o">();</span></code></pre></div>
-
-  </div>
-
-  <div data-lang="scala">
-
-    <div class="highlight"><pre><code class="language-scala" 
data-lang="scala"><span class="n">dataStream</span><span 
class="o">.</span><span class="n">window</span><span class="o">(</span><span 
class="nc">Count</span><span class="o">.</span><span class="n">of</span><span 
class="o">(</span><span class="mi">1000</span><span class="o">)).</span><span 
class="n">groupBy</span><span class="o">(</span><span 
class="n">firstKey</span><span class="o">).</span><span 
class="n">mapWindow</span><span class="o">(</span><span 
class="err">…</span><span class="o">)</span>
-    <span class="o">.</span><span class="n">groupBy</span><span 
class="o">(</span><span class="n">secondKey</span><span 
class="o">).</span><span class="n">reduceWindow</span><span 
class="o">(</span><span class="err">…</span><span class="o">).</span><span 
class="n">flatten</span><span class="o">()</span></code></pre></div>
-
-  </div>
-</div>
-
-<p>The above call would create global windows of 1000 elements, group them by 
the first key, and then apply a mapWindow transformation. The resulting 
windowed stream will then be grouped by the second key and further reduced. The 
results of the reduce transformation are then flattened.</p>
-
-<p>Notice that here we only defined the window size once at the beginning of 
the transformation. This means that anything that happens afterwards 
(<code>groupBy(firstKey).mapWindow(…).groupBy(secondKey).reduceWindow(…)</code>)
 happens inside the 1000 element windows. Of course the mapWindow might reduce 
the number of elements, but the key idea is that each transformation still 
corresponds to the same 1000 elements in the original stream.</p>
-
-<h4 id="periodic-aggregations-on-the-full-stream-history">Periodic 
aggregations on the full stream history</h4>
-<p>Sometimes it is necessary to aggregate over all the previously seen data in 
the stream. For this purpose either use the 
<code>dataStream.window(FullStream.window()).every(trigger)</code> or 
equivalently <code>dataStream.every(trigger)</code>.</p>
-
-<h4 id="global-vs-local-discretisation">Global vs local discretisation</h4>
-<p>By default all window discretisation calls 
(<code>dataStream.window(…)</code>) define global windows meaning that a 
global window of count 100 will contain the last 100 elements arrived at the 
discretisation operator in order. In most cases (except for Time) this means 
that the operator doing the actual discretisation needs to have a parallelism 
of 1 to be able to correctly execute the discretisation logic.</p>
-
-<p>Sometimes it is sufficient to create local discretisations, which allows 
the discretiser to run in parallel and apply the given discretisation logic at 
every discretiser instance. To allow local discretisation use the 
<code>local()</code> method of the windowed data stream.</p>
-
-<p>For example, <code>dataStream.window(Count.of(100)).maxBy(field)</code> 
would create global windows of 100 elements (Count discretises with parallelism 
of 1) and return the record with the max value by the selected field; 
alternatively the 
<code>dataStream.window(Count.of(100)).local().maxBy(field)</code> would create 
several count discretisers (as defined by the environment parallelism) and 
compute the max values accordingly.</p>
-
-<h3 id="temporal-database-style-operators">Temporal database style 
operators</h3>
-
-<p>While database style operators like joins (on key) and crosses are hard to 
define properly on data streams, a straightforward interpretation is to apply 
these operators on windows of the data streams.</p>
-
-<p>Currently join and cross operators are supported only on time windows. We 
are working on alleviating this limitation in the next release.</p>
-
-<p>Temporal operators take the current windows of both streams and apply the 
join/cross logic on these window pairs.</p>
-
-<p>The Join transformation produces a new Tuple DataStream with two fields. 
Each tuple holds a joined element of the first input DataStream in the first 
tuple field and a matching element of the second input DataStream in the second 
field for the current window. The user can also supply a custom join function 
to control the produced elements.</p>
-
-<p>The following code shows a default Join transformation using field position 
keys:</p>
-
-<div class="codetabs">
-  <div data-lang="java">
-
-    <div class="highlight"><pre><code class="language-java" 
data-lang="java"><span class="n">dataStream1</span><span 
class="o">.</span><span class="na">join</span><span class="o">(</span><span 
class="n">dataStream2</span><span class="o">)</span>
-    <span class="o">.</span><span class="na">onWindow</span><span 
class="o">(</span><span class="n">windowing_params</span><span 
class="o">)</span>
-    <span class="o">.</span><span class="na">where</span><span 
class="o">(</span><span class="n">key_in_first</span><span class="o">)</span>
-    <span class="o">.</span><span class="na">equalTo</span><span 
class="o">(</span><span class="n">key_in_second</span><span 
class="o">);</span></code></pre></div>
-
-  </div>
-  <div data-lang="scala">
-
-    <div class="highlight"><pre><code class="language-scala" 
data-lang="scala"><span class="n">dataStream1</span><span 
class="o">.</span><span class="n">join</span><span class="o">(</span><span 
class="n">dataStream2</span><span class="o">)</span>
-    <span class="o">.</span><span class="n">onWindow</span><span 
class="o">(</span><span class="n">windowing_params</span><span 
class="o">)</span>
-    <span class="o">.</span><span class="n">where</span><span 
class="o">(</span><span class="n">key_in_first</span><span class="o">)</span>
-    <span class="o">.</span><span class="n">equalTo</span><span 
class="o">(</span><span class="n">key_in_second</span><span 
class="o">)</span></code></pre></div>
-
-  </div>
-</div>
-
-<p>The Cross transformation combines two <code>DataStream</code>s into one 
<code>DataStream</code>. It builds all pairwise combinations of the elements of 
both input DataStreams in the current window, i.e., it builds a temporal 
Cartesian product. The user can also supply a custom cross function to control 
the produced elements</p>
-
-<div class="codetabs">
-  <div data-lang="java">
-
-    <div class="highlight"><pre><code class="language-java" 
data-lang="java"><span class="n">dataStream1</span><span 
class="o">.</span><span class="na">cross</span><span class="o">(</span><span 
class="n">dataStream2</span><span class="o">).</span><span 
class="na">onWindow</span><span class="o">(</span><span 
class="n">windowing_params</span><span class="o">);</span></code></pre></div>
-
-  </div>
-  <div data-lang="scala">
-
-    <div class="highlight"><pre><code class="language-scala" 
data-lang="scala"><span class="n">dataStream1</span> <span 
class="n">cross</span> <span class="n">dataStream2</span> <span 
class="n">onWindow</span> <span class="o">(</span><span 
class="n">windowing_params</span><span class="o">)</span></code></pre></div>
-
-  </div>
-</div>
-
-<h3 id="co-operators">Co operators</h3>
-
-<p>Co operators allow the users to jointly transform two 
<code>DataStream</code>s of different types, providing a simple way to jointly 
manipulate streams with a shared state. It is designed to support joint stream 
transformations where union is not appropriate due to different data types, or 
in case the user needs explicit tracking of the origin of individual elements.
-Co operators can be applied to <code>ConnectedDataStream</code>s which 
represent two <code>DataStream</code>s of possibly different types. A 
<code>ConnectedDataStream</code> can be created by calling the 
<code>connect(otherDataStream)</code> method of a <code>DataStream</code>.</p>
-
-<h4 id="map-on-connecteddatastream">Map on ConnectedDataStream</h4>
-<p>Applies a CoMap transformation on two separate DataStreams, mapping them to 
a common output type. The transformation calls a 
<code>CoMapFunction.map1()</code> for each element of the first input and 
<code>CoMapFunction.map2()</code> for each element of the second input. Each 
CoMapFunction call returns exactly one element.
-A CoMap operator that outputs true if an Integer value is received and false 
if a String value is received:</p>
-
-<div class="codetabs">
-  <div data-lang="java">
-
-   

<TRUNCATED>

Reply via email to