http://git-wip-us.apache.org/repos/asf/flink-web/blob/48be7c6f/content/blog/feed.xml
----------------------------------------------------------------------
diff --git a/content/blog/feed.xml b/content/blog/feed.xml
index ef1a75c..4724a31 100644
--- a/content/blog/feed.xml
+++ b/content/blog/feed.xml
@@ -7,6 +7,19 @@
 <atom:link href="http://flink.apache.org/blog/feed.xml"; rel="self" 
type="application/rss+xml" />
 
 <item>
+<title>Flink Forward 2016 Call for Submissions Is Now Open</title>
+<description>&lt;p&gt;We are happy to announce that the call for submissions 
for Flink Forward 2016 is now open! The conference will take place September 
12-14, 2016 in Berlin, Germany, bringing together the open source stream 
processing community. Most Apache Flink committers will attend the conference, 
making it the ideal venue to learn more about the project and its roadmap and 
connect with the community.&lt;/p&gt;
+
+&lt;p&gt;The conference welcomes submissions on everything Flink-related, 
including experiences with using Flink, products based on Flink, technical 
talks on extending Flink, as well as connecting Flink with other open source or 
proprietary software.&lt;/p&gt;
+
+&lt;p&gt;Read more &lt;a 
href=&quot;http://flink-forward.org/&quot;&gt;here&lt;/a&gt;.&lt;/p&gt;
+</description>
+<pubDate>Thu, 14 Apr 2016 12:00:00 +0200</pubDate>
+<link>http://flink.apache.org/news/2016/04/14/flink-forward-announce.html</link>
+<guid isPermaLink="true">/news/2016/04/14/flink-forward-announce.html</guid>
+</item>
+
+<item>
 <title>Introducing Complex Event Processing (CEP) with Apache Flink</title>
 <description>&lt;p&gt;With the ubiquity of sensor networks and smart devices 
continuously collecting more and more data, we face the challenge to analyze an 
ever growing stream of data in near real-time. 
 Being able to react quickly to changing trends or to deliver up to date 
business intelligence can be a decisive factor for a company’s success or 
failure. 
@@ -128,7 +141,7 @@ Our pattern select function generates for each matching 
pattern a &lt;code&gt;Te
         &lt;span class=&quot;n&quot;&gt;TemperatureEvent&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;first&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;TemperatureEvent&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;pattern&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;get&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;s&quot;&gt;&amp;quot;First Event&amp;quot;&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;);&lt;/span&gt;
         &lt;span class=&quot;n&quot;&gt;TemperatureEvent&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;second&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;TemperatureEvent&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;pattern&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;get&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;s&quot;&gt;&amp;quot;Second Event&amp;quot;&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;);&lt;/span&gt;
 
-        &lt;span class=&quot;k&quot;&gt;return&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;TemperatureWarning&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;
+        &lt;span class=&quot;k&quot;&gt;return&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;TemperatureWarning&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;
             &lt;span class=&quot;n&quot;&gt;first&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;getRackID&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(),&lt;/span&gt; 
             &lt;span class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;first&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;getTemperature&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;()&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;+&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;second&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;getTemperature&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;())&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;/&lt;/span&gt; &lt;span 
class=&quot;mi&quot;&gt;2&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;);&lt;/span&gt;
     &lt;span class=&quot;o&quot;&gt;}&lt;/span&gt;
@@ -170,7 +183,7 @@ Thus, we will only generate a 
&lt;code&gt;TemperatureAlert&lt;/code&gt; if and o
         &lt;span class=&quot;n&quot;&gt;TemperatureWarning&lt;/span&gt; 
&lt;span class=&quot;n&quot;&gt;second&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;pattern&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;get&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;s&quot;&gt;&amp;quot;Second Event&amp;quot;&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;);&lt;/span&gt;
 
         &lt;span class=&quot;k&quot;&gt;if&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;first&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;getAverageTemperature&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;()&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;second&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;getAverageTemperature&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;())&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;{&lt;/span&gt;
-            &lt;span class=&quot;n&quot;&gt;out&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;collect&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;TemperatureAlert&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;first&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;getRackID&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;()));&lt;/span&gt;
+            &lt;span class=&quot;n&quot;&gt;out&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;collect&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;TemperatureAlert&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;first&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;getRackID&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;()));&lt;/span&gt;
         &lt;span class=&quot;o&quot;&gt;}&lt;/span&gt;
     &lt;span 
class=&quot;o&quot;&gt;});&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;
 
@@ -272,7 +285,7 @@ This feature will allow to prune unpromising event 
sequences early.&lt;/p&gt;
 
 <item>
 <title>Announcing Apache Flink 1.0.0</title>
-<description>&lt;p&gt;The Apache Flink community is pleased to announce the 
availability of the 1.0.0 release. The community put significant effort into 
improving and extending Apache Flink since the last release, focusing on 
improving the experience of writing and executing data stream processing 
pipelines in production. &lt;/p&gt;
+<description>&lt;p&gt;The Apache Flink community is pleased to announce the 
availability of the 1.0.0 release. The community put significant effort into 
improving and extending Apache Flink since the last release, focusing on 
improving the experience of writing and executing data stream processing 
pipelines in production.&lt;/p&gt;
 
 &lt;center&gt;
 &lt;img src=&quot;/img/blog/flink-1.0.png&quot; 
style=&quot;height:200px;margin:15px&quot; /&gt;
@@ -313,7 +326,7 @@ When using this backend, active state in streaming programs 
can grow well beyond
 
 &lt;p&gt;The checkpointing has been extended by a more fine-grained control 
mechanism: In previous versions, new checkpoints were triggered independent of 
the speed at which old checkpoints completed. This can lead to situations where 
new checkpoints are piling up, because they are triggered too 
frequently.&lt;/p&gt;
 
-&lt;p&gt;The checkpoint coordinator now exposes statistics through our REST 
monitoring API and the web interface. Users can review the checkpoint size and 
duration on a per-operator basis and see the last completed checkpoints. This 
is helpful for identifying performance issues, such as processing slowdown by 
the checkpoints. &lt;/p&gt;
+&lt;p&gt;The checkpoint coordinator now exposes statistics through our REST 
monitoring API and the web interface. Users can review the checkpoint size and 
duration on a per-operator basis and see the last completed checkpoints. This 
is helpful for identifying performance issues, such as processing slowdown by 
the checkpoints.&lt;/p&gt;
 
 &lt;h2 
id=&quot;improved-kafka-connector-and-support-for-kafka-09&quot;&gt;Improved 
Kafka connector and support for Kafka 0.9&lt;/h2&gt;
 
@@ -699,13 +712,13 @@ For this, you only need to replace the dependency 
&lt;code&gt;storm-core&lt;/cod
 First, the program is assembled the Storm way without any code change to 
Spouts, Bolts, or the topology itself.&lt;/p&gt;
 
 &lt;div class=&quot;highlight&quot;&gt;&lt;pre&gt;&lt;code 
class=&quot;language-java&quot;&gt;&lt;span class=&quot;c1&quot;&gt;// assemble 
topology, the Storm way&lt;/span&gt;
-&lt;span class=&quot;n&quot;&gt;TopologyBuilder&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;builder&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;TopologyBuilder&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;();&lt;/span&gt;
-&lt;span class=&quot;n&quot;&gt;builder&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;setSpout&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;s&quot;&gt;&amp;quot;source&amp;quot;&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;StormFileSpout&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;inputFilePath&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;));&lt;/span&gt;
-&lt;span class=&quot;n&quot;&gt;builder&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;setBolt&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;s&quot;&gt;&amp;quot;tokenizer&amp;quot;&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;StormBoltTokenizer&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;())&lt;/span&gt;
+&lt;span class=&quot;n&quot;&gt;TopologyBuilder&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;builder&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;TopologyBuilder&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;();&lt;/span&gt;
+&lt;span class=&quot;n&quot;&gt;builder&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;setSpout&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;s&quot;&gt;&amp;quot;source&amp;quot;&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;StormFileSpout&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;inputFilePath&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;));&lt;/span&gt;
+&lt;span class=&quot;n&quot;&gt;builder&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;setBolt&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;s&quot;&gt;&amp;quot;tokenizer&amp;quot;&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;StormBoltTokenizer&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;())&lt;/span&gt;
        &lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;shuffleGrouping&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;s&quot;&gt;&amp;quot;source&amp;quot;&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;);&lt;/span&gt;
-&lt;span class=&quot;n&quot;&gt;builder&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;setBolt&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;s&quot;&gt;&amp;quot;counter&amp;quot;&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;StormBoltCounter&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;())&lt;/span&gt;
-       &lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;fieldsGrouping&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;s&quot;&gt;&amp;quot;tokenizer&amp;quot;&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;Fields&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;s&quot;&gt;&amp;quot;word&amp;quot;&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;));&lt;/span&gt;
-&lt;span class=&quot;n&quot;&gt;builder&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;setBolt&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;s&quot;&gt;&amp;quot;sink&amp;quot;&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;StormBoltFileSink&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;outputFilePath&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;))&lt;/span&gt;
+&lt;span class=&quot;n&quot;&gt;builder&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;setBolt&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;s&quot;&gt;&amp;quot;counter&amp;quot;&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;StormBoltCounter&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;())&lt;/span&gt;
+       &lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;fieldsGrouping&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;s&quot;&gt;&amp;quot;tokenizer&amp;quot;&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;Fields&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;s&quot;&gt;&amp;quot;word&amp;quot;&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;));&lt;/span&gt;
+&lt;span class=&quot;n&quot;&gt;builder&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;setBolt&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;s&quot;&gt;&amp;quot;sink&amp;quot;&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;StormBoltFileSink&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;outputFilePath&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;))&lt;/span&gt;
        &lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;shuffleGrouping&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;s&quot;&gt;&amp;quot;counter&amp;quot;&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;);&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;
 
 &lt;p&gt;In order to execute the topology, we need to translate it to a 
&lt;code&gt;FlinkTopology&lt;/code&gt; and submit it to a local or remote Flink 
cluster, very similar to submitting the application to a Storm 
cluster.&lt;sup&gt;&lt;a href=&quot;#fn1&quot; 
id=&quot;ref1&quot;&gt;1&lt;/a&gt;&lt;/sup&gt;&lt;/p&gt;
@@ -714,7 +727,7 @@ First, the program is assembled the Storm way without any 
code change to Spouts,
 &lt;span class=&quot;c1&quot;&gt;// replaces: StormTopology topology = 
builder.createTopology();&lt;/span&gt;
 &lt;span class=&quot;n&quot;&gt;FlinkTopology&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;topology&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;FlinkTopology&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;createTopology&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;builder&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;);&lt;/span&gt;
 
-&lt;span class=&quot;n&quot;&gt;Config&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;conf&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;Config&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;();&lt;/span&gt;
+&lt;span class=&quot;n&quot;&gt;Config&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;conf&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;Config&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;();&lt;/span&gt;
 &lt;span class=&quot;k&quot;&gt;if&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;runLocal&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;{&lt;/span&gt;
        &lt;span class=&quot;c1&quot;&gt;// use FlinkLocalCluster instead of 
LocalCluster&lt;/span&gt;
        &lt;span class=&quot;n&quot;&gt;FlinkLocalCluster&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;cluster&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;FlinkLocalCluster&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;getLocalCluster&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;();&lt;/span&gt;
@@ -754,14 +767,14 @@ As Storm is type agnostic, it is required to specify the 
output type of embedded
 &lt;span class=&quot;c1&quot;&gt;// use Spout as source&lt;/span&gt;
 &lt;span class=&quot;n&quot;&gt;DataStream&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;Tuple1&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;String&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;&amp;gt;&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;source&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; 
   &lt;span class=&quot;n&quot;&gt;env&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;addSource&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span class=&quot;c1&quot;&gt;// Flink 
provided wrapper including original Spout&lt;/span&gt;
-                &lt;span class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;SpoutWrapper&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;String&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;FileSpout&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;localFilePath&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)),&lt;/span&gt; 
+                &lt;span class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;SpoutWrapper&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;String&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;FileSpout&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;localFilePath&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)),&lt;/span&gt; 
                 &lt;span class=&quot;c1&quot;&gt;// specify output type 
manually&lt;/span&gt;
                 &lt;span 
class=&quot;n&quot;&gt;TypeExtractor&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;getForObject&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;Tuple1&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;String&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;(&lt;/span&gt;&lt;span 
class=&quot;s&quot;&gt;&amp;quot;&amp;quot;&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)));&lt;/span&gt;
 &lt;span class=&quot;c1&quot;&gt;// FileSpout cannot be 
parallelized&lt;/span&gt;
 &lt;span class=&quot;n&quot;&gt;DataStream&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;Tuple1&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;String&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;&amp;gt;&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;text&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;source&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;setParallelism&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;mi&quot;&gt;1&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;);&lt;/span&gt;
 
 &lt;span class=&quot;c1&quot;&gt;// further processing with Flink&lt;/span&gt;
-&lt;span class=&quot;n&quot;&gt;DataStream&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;Tuple2&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;String&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;Integer&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;tokens&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;text&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;flatMap&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;Tokenizer&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;()).&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;keyBy&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span class=&quot;mi&quot;&gt;0&lt;/s
 pan&gt;&lt;span class=&quot;o&quot;&gt;);&lt;/span&gt;
+&lt;span class=&quot;n&quot;&gt;DataStream&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;Tuple2&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;String&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;Integer&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;tokens&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;text&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;flatMap&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;Tokenizer&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;()).&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;keyBy&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span class=&quot;mi&quot;&gt;0&lt;/
 span&gt;&lt;span class=&quot;o&quot;&gt;);&lt;/span&gt;
 
 &lt;span class=&quot;c1&quot;&gt;// use Bolt for counting&lt;/span&gt;
 &lt;span class=&quot;n&quot;&gt;DataStream&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;Tuple2&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;String&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;Integer&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;counts&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt;
@@ -769,7 +782,7 @@ As Storm is type agnostic, it is required to specify the 
output type of embedded
                    &lt;span class=&quot;c1&quot;&gt;// specify output type 
manually&lt;/span&gt;
                    &lt;span 
class=&quot;n&quot;&gt;TypeExtractor&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;getForObject&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;Tuple2&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;String&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;Integer&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;(&lt;/span&gt;&lt;span 
class=&quot;s&quot;&gt;&amp;quot;&amp;quot;&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt;&lt;span 
class=&quot;mi&quot;&gt;0&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;))&lt;/span&gt;
                    &lt;span class=&quot;c1&quot;&gt;// Flink provided wrapper 
including original Bolt&lt;/span&gt;
-                   &lt;span class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;BoltWrapper&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;String&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;Tuple2&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;String&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;Integer&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;&amp;gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;BoltCounter&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;()));&lt;/span&gt;
+                   &lt;span class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;BoltWrapper&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;String&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;Tuple2&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;String&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;Integer&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;&amp;gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;BoltCounter&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;()));&lt;/span&gt;
 
 &lt;span class=&quot;c1&quot;&gt;// write result to file via Flink 
sink&lt;/span&gt;
 &lt;span class=&quot;n&quot;&gt;counts&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;writeAsText&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;outputPath&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;);&lt;/span&gt;
@@ -807,7 +820,7 @@ While you can embed Spouts/Bolts in a Flink program and 
mix-and-match them with
 
 <item>
 <title>Introducing Stream Windows in Apache Flink</title>
-<description>&lt;p&gt;The data analysis space is witnessing an evolution from 
batch to stream processing for many use cases. Although batch can be handled as 
a special case of stream processing, analyzing never-ending streaming data 
often requires a shift in the mindset and comes with its own terminology (for 
example, “windowing” and “at-least-once”/”exactly-once” 
processing). This shift and the new terminology can be quite confusing for 
people being new to the space of stream processing. Apache Flink is a 
production-ready stream processor with an easy-to-use yet very expressive API 
to define advanced stream analysis programs. Flink’s API features very 
flexible window definitions on data streams which let it stand out among other 
open source stream processors. &lt;/p&gt;
+<description>&lt;p&gt;The data analysis space is witnessing an evolution from 
batch to stream processing for many use cases. Although batch can be handled as 
a special case of stream processing, analyzing never-ending streaming data 
often requires a shift in the mindset and comes with its own terminology (for 
example, “windowing” and “at-least-once”/”exactly-once” 
processing). This shift and the new terminology can be quite confusing for 
people being new to the space of stream processing. Apache Flink is a 
production-ready stream processor with an easy-to-use yet very expressive API 
to define advanced stream analysis programs. Flink’s API features very 
flexible window definitions on data streams which let it stand out among other 
open source stream processors.&lt;/p&gt;
 
 &lt;p&gt;In this blog post, we discuss the concept of windows for stream 
processing, present Flink’s built-in windows, and explain its support for 
custom windowing semantics.&lt;/p&gt;
 
@@ -870,17 +883,17 @@ While you can embed Spouts/Bolts in a Flink program and 
mix-and-match them with
 
 &lt;p&gt;There is one aspect that we haven’t discussed yet, namely the exact 
meaning of “&lt;em&gt;collects elements for one minute&lt;/em&gt;” which 
boils down to the question, “&lt;em&gt;How does the stream processor 
interpret time?&lt;/em&gt;”.&lt;/p&gt;
 
-&lt;p&gt;Apache Flink features three different notions of time, namely 
&lt;em&gt;processing time&lt;/em&gt;, &lt;em&gt;event time&lt;/em&gt;, and 
&lt;em&gt;ingestion time&lt;/em&gt;. &lt;/p&gt;
+&lt;p&gt;Apache Flink features three different notions of time, namely 
&lt;em&gt;processing time&lt;/em&gt;, &lt;em&gt;event time&lt;/em&gt;, and 
&lt;em&gt;ingestion time&lt;/em&gt;.&lt;/p&gt;
 
 &lt;ol&gt;
-  &lt;li&gt;In &lt;strong&gt;processing time&lt;/strong&gt;, windows are 
defined with respect to the wall clock of the machine that builds and processes 
a window, i.e., a one minute processing time window collects elements for 
exactly one minute. &lt;/li&gt;
-  &lt;li&gt;In &lt;strong&gt;event time&lt;/strong&gt;, windows are defined 
with respect to timestamps that are attached to each event record. This is 
common for many types of events, such as log entries, sensor data, etc, where 
the timestamp usually represents the time at which the event occurred. Event 
time has several benefits over processing time. First of all, it decouples the 
program semantics from the actual serving speed of the source and the 
processing performance of system. Hence you can process historic data, which is 
served at maximum speed, and continuously produced data with the same program. 
It also prevents semantically incorrect results in case of backpressure or 
delays due to failure recovery. Second, event time windows compute correct 
results, even if events arrive out-of-order of their timestamp which is common 
if a data stream gathers events from distributed sources. &lt;/li&gt;
+  &lt;li&gt;In &lt;strong&gt;processing time&lt;/strong&gt;, windows are 
defined with respect to the wall clock of the machine that builds and processes 
a window, i.e., a one minute processing time window collects elements for 
exactly one minute.&lt;/li&gt;
+  &lt;li&gt;In &lt;strong&gt;event time&lt;/strong&gt;, windows are defined 
with respect to timestamps that are attached to each event record. This is 
common for many types of events, such as log entries, sensor data, etc, where 
the timestamp usually represents the time at which the event occurred. Event 
time has several benefits over processing time. First of all, it decouples the 
program semantics from the actual serving speed of the source and the 
processing performance of system. Hence you can process historic data, which is 
served at maximum speed, and continuously produced data with the same program. 
It also prevents semantically incorrect results in case of backpressure or 
delays due to failure recovery. Second, event time windows compute correct 
results, even if events arrive out-of-order of their timestamp which is common 
if a data stream gathers events from distributed sources.&lt;/li&gt;
   &lt;li&gt;&lt;strong&gt;Ingestion time&lt;/strong&gt; is a hybrid of 
processing and event time. It assigns wall clock timestamps to records as soon 
as they arrive in the system (at the source) and continues processing with 
event time semantics based on the attached timestamps.&lt;/li&gt;
 &lt;/ol&gt;
 
 &lt;h2 id=&quot;count-windows&quot;&gt;Count Windows&lt;/h2&gt;
 
-&lt;p&gt;Apache Flink also features count windows. A tumbling count window of 
100 will collect 100 events in a window and evaluate the window when the 100th 
element has been added. &lt;/p&gt;
+&lt;p&gt;Apache Flink also features count windows. A tumbling count window of 
100 will collect 100 events in a window and evaluate the window when the 100th 
element has been added.&lt;/p&gt;
 
 &lt;p&gt;In Flink’s DataStream API, tumbling and sliding count windows are 
defined as follows:&lt;/p&gt;
 
@@ -903,7 +916,7 @@ While you can embed Spouts/Bolts in a Flink program and 
mix-and-match them with
 
 &lt;h2 id=&quot;dissecting-flinks-windowing-mechanics&quot;&gt;Dissecting 
Flink’s windowing mechanics&lt;/h2&gt;
 
-&lt;p&gt;Flink’s built-in time and count windows cover a wide range of 
common window use cases. However, there are of course applications that require 
custom windowing logic that cannot be addressed by Flink’s built-in windows. 
In order to support also applications that need very specific windowing 
semantics, the DataStream API exposes interfaces for the internals of its 
windowing mechanics. These interfaces give very fine-grained control about the 
way that windows are built and evaluated. &lt;/p&gt;
+&lt;p&gt;Flink’s built-in time and count windows cover a wide range of 
common window use cases. However, there are of course applications that require 
custom windowing logic that cannot be addressed by Flink’s built-in windows. 
In order to support also applications that need very specific windowing 
semantics, the DataStream API exposes interfaces for the internals of its 
windowing mechanics. These interfaces give very fine-grained control about the 
way that windows are built and evaluated.&lt;/p&gt;
 
 &lt;p&gt;The following figure depicts Flink’s windowing mechanism and 
introduces the components being involved.&lt;/p&gt;
 
@@ -1025,7 +1038,7 @@ While you can embed Spouts/Bolts in a Flink program and 
mix-and-match them with
 <title>Announcing Apache Flink 0.10.0</title>
 <description>&lt;p&gt;The Apache Flink community is pleased to announce the 
availability of the 0.10.0 release. The community put significant effort into 
improving and extending Apache Flink since the last release, focusing on data 
stream processing and operational features. About 80 contributors provided bug 
fixes, improvements, and new features such that in total more than 400 JIRA 
issues could be resolved.&lt;/p&gt;
 
-&lt;p&gt;For Flink 0.10.0, the focus of the community was to graduate the 
DataStream API from beta and to evolve Apache Flink into a production-ready 
stream data processor with a competitive feature set. These efforts resulted in 
support for event-time and out-of-order streams, exactly-once guarantees in the 
case of failures, a very flexible windowing mechanism, sophisticated operator 
state management, and a highly-available cluster operation mode. Flink 0.10.0 
also brings a new monitoring dashboard with real-time system and job monitoring 
capabilities. Both batch and streaming modes of Flink benefit from the new high 
availability and improved monitoring features. Needless to say that Flink 
0.10.0 includes many more features, improvements, and bug fixes. &lt;/p&gt;
+&lt;p&gt;For Flink 0.10.0, the focus of the community was to graduate the 
DataStream API from beta and to evolve Apache Flink into a production-ready 
stream data processor with a competitive feature set. These efforts resulted in 
support for event-time and out-of-order streams, exactly-once guarantees in the 
case of failures, a very flexible windowing mechanism, sophisticated operator 
state management, and a highly-available cluster operation mode. Flink 0.10.0 
also brings a new monitoring dashboard with real-time system and job monitoring 
capabilities. Both batch and streaming modes of Flink benefit from the new high 
availability and improved monitoring features. Needless to say that Flink 
0.10.0 includes many more features, improvements, and bug fixes.&lt;/p&gt;
 
 &lt;p&gt;We encourage everyone to &lt;a 
href=&quot;/downloads.html&quot;&gt;download the release&lt;/a&gt; and &lt;a 
href=&quot;https://ci.apache.org/projects/flink/flink-docs-release-0.10/&quot;&gt;check
 out the documentation&lt;/a&gt;. Feedback through the Flink &lt;a 
href=&quot;/community.html#mailing-lists&quot;&gt;mailing lists&lt;/a&gt; is, 
as always, very welcome!&lt;/p&gt;
 
@@ -1244,7 +1257,7 @@ Also note that some methods in the DataStream API had to 
be renamed as part of t
 
 &lt;h2 id=&quot;the-off-heap-memory-implementation&quot;&gt;The off-heap 
Memory Implementation&lt;/h2&gt;
 
-&lt;p&gt;Given that all memory intensive internal algorithms are already 
implemented against the &lt;code&gt;MemorySegment&lt;/code&gt;, our 
implementation to switch to off-heap memory is actually trivial. You can 
compare it to replacing all 
&lt;code&gt;ByteBuffer.allocate(numBytes)&lt;/code&gt; calls with 
&lt;code&gt;ByteBuffer.allocateDirect(numBytes)&lt;/code&gt;. In Flink’s case 
it meant that we made the &lt;code&gt;MemorySegment&lt;/code&gt; abstract and 
added the &lt;code&gt;HeapMemorySegment&lt;/code&gt; and 
&lt;code&gt;OffHeapMemorySegment&lt;/code&gt; subclasses. The 
&lt;code&gt;OffHeapMemorySegment&lt;/code&gt; takes the off-heap memory pointer 
from a &lt;code&gt;java.nio.DirectByteBuffer&lt;/code&gt; and implements its 
specialized access methods using &lt;code&gt;sun.misc.Unsafe&lt;/code&gt;. We 
also made a few adjustments to the startup scripts and the deployment code to 
make sure that the JVM is permitted enough off-heap memory (direct memory, 
&lt;em&gt;-XX:MaxDirectM
 emorySize&lt;/em&gt;). &lt;/p&gt;
+&lt;p&gt;Given that all memory intensive internal algorithms are already 
implemented against the &lt;code&gt;MemorySegment&lt;/code&gt;, our 
implementation to switch to off-heap memory is actually trivial. You can 
compare it to replacing all 
&lt;code&gt;ByteBuffer.allocate(numBytes)&lt;/code&gt; calls with 
&lt;code&gt;ByteBuffer.allocateDirect(numBytes)&lt;/code&gt;. In Flink’s case 
it meant that we made the &lt;code&gt;MemorySegment&lt;/code&gt; abstract and 
added the &lt;code&gt;HeapMemorySegment&lt;/code&gt; and 
&lt;code&gt;OffHeapMemorySegment&lt;/code&gt; subclasses. The 
&lt;code&gt;OffHeapMemorySegment&lt;/code&gt; takes the off-heap memory pointer 
from a &lt;code&gt;java.nio.DirectByteBuffer&lt;/code&gt; and implements its 
specialized access methods using &lt;code&gt;sun.misc.Unsafe&lt;/code&gt;. We 
also made a few adjustments to the startup scripts and the deployment code to 
make sure that the JVM is permitted enough off-heap memory (direct memory, 
&lt;em&gt;-XX:MaxDirectM
 emorySize&lt;/em&gt;).&lt;/p&gt;
 
 &lt;p&gt;In practice we had to go one step further, to make the implementation 
perform well. While the &lt;code&gt;ByteBuffer&lt;/code&gt; is used in I/O code 
paths to compose headers and move bulk memory into place, the MemorySegment is 
part of the innermost loops of many algorithms (sorting, hash tables, …). 
That means that the access methods have to be as fast as possible.&lt;/p&gt;
 
@@ -2268,21 +2281,21 @@ and mutations as well as neighborhood 
aggregations.&lt;/p&gt;
 
 &lt;h4 id=&quot;common-graph-metrics&quot;&gt;Common Graph Metrics&lt;/h4&gt;
 &lt;p&gt;These methods can be used to retrieve several graph metrics and 
properties, such as the number
-of vertices, edges and the node degrees. &lt;/p&gt;
+of vertices, edges and the node degrees.&lt;/p&gt;
 
 &lt;h4 id=&quot;transformations&quot;&gt;Transformations&lt;/h4&gt;
 &lt;p&gt;The transformation methods enable several Graph operations, using 
high-level functions similar to
 the ones provided by the batch processing API. These transformations can be 
applied one after the
-other, yielding a new Graph after each step, in a fashion similar to operators 
on DataSets: &lt;/p&gt;
+other, yielding a new Graph after each step, in a fashion similar to operators 
on DataSets:&lt;/p&gt;
 
-&lt;div class=&quot;highlight&quot;&gt;&lt;pre&gt;&lt;code 
class=&quot;language-java&quot;&gt;&lt;span 
class=&quot;n&quot;&gt;inputGraph&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;getUndirected&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;().&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;mapEdges&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;CustomEdgeMapper&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;());&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;
+&lt;div class=&quot;highlight&quot;&gt;&lt;pre&gt;&lt;code 
class=&quot;language-java&quot;&gt;&lt;span 
class=&quot;n&quot;&gt;inputGraph&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;getUndirected&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;().&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;mapEdges&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;CustomEdgeMapper&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;());&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;
 
 &lt;p&gt;Transformations can be applied on:&lt;/p&gt;
 
 &lt;ol&gt;
-  &lt;li&gt;&lt;strong&gt;Vertices&lt;/strong&gt;: 
&lt;code&gt;mapVertices&lt;/code&gt;, 
&lt;code&gt;joinWithVertices&lt;/code&gt;, 
&lt;code&gt;filterOnVertices&lt;/code&gt;, &lt;code&gt;addVertex&lt;/code&gt;, 
…  &lt;/li&gt;
-  &lt;li&gt;&lt;strong&gt;Edges&lt;/strong&gt;: 
&lt;code&gt;mapEdges&lt;/code&gt;, &lt;code&gt;filterOnEdges&lt;/code&gt;, 
&lt;code&gt;removeEdge&lt;/code&gt;, …   &lt;/li&gt;
-  &lt;li&gt;&lt;strong&gt;Triplets&lt;/strong&gt; (source vertex, target 
vertex, edge): &lt;code&gt;getTriplets&lt;/code&gt;  &lt;/li&gt;
+  &lt;li&gt;&lt;strong&gt;Vertices&lt;/strong&gt;: 
&lt;code&gt;mapVertices&lt;/code&gt;, 
&lt;code&gt;joinWithVertices&lt;/code&gt;, 
&lt;code&gt;filterOnVertices&lt;/code&gt;, &lt;code&gt;addVertex&lt;/code&gt;, 
…&lt;/li&gt;
+  &lt;li&gt;&lt;strong&gt;Edges&lt;/strong&gt;: 
&lt;code&gt;mapEdges&lt;/code&gt;, &lt;code&gt;filterOnEdges&lt;/code&gt;, 
&lt;code&gt;removeEdge&lt;/code&gt;, …&lt;/li&gt;
+  &lt;li&gt;&lt;strong&gt;Triplets&lt;/strong&gt; (source vertex, target 
vertex, edge): &lt;code&gt;getTriplets&lt;/code&gt;&lt;/li&gt;
 &lt;/ol&gt;
 
 &lt;h4 id=&quot;neighborhood-aggregations&quot;&gt;Neighborhood 
Aggregations&lt;/h4&gt;
@@ -2311,7 +2324,7 @@ one or more values per vertex, the more general  
&lt;code&gt;groupReduceOnEdges(
 &lt;p&gt;Assume you would want to compute the sum of the values of all 
incoming neighbors for each vertex.
 We will call the &lt;code&gt;reduceOnNeighbors()&lt;/code&gt; aggregation 
method since the sum is an associative and commutative operation and the 
neighbors’ values are needed:&lt;/p&gt;
 
-&lt;div class=&quot;highlight&quot;&gt;&lt;pre&gt;&lt;code 
class=&quot;language-java&quot;&gt;&lt;span 
class=&quot;n&quot;&gt;graph&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;reduceOnNeighbors&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;SumValues&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(),&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;EdgeDirection&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;IN&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;);&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;
+&lt;div class=&quot;highlight&quot;&gt;&lt;pre&gt;&lt;code 
class=&quot;language-java&quot;&gt;&lt;span 
class=&quot;n&quot;&gt;graph&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;reduceOnNeighbors&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;SumValues&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(),&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;EdgeDirection&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;IN&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;);&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;
 
 &lt;p&gt;The vertex with id 1 is the only node that has no incoming edges. The 
result is therefore:&lt;/p&gt;
 
@@ -2416,7 +2429,7 @@ vertex values do not need to be recomputed during an 
iteration.&lt;/p&gt;
 &lt;p&gt;Let us reconsider the Single Source Shortest Paths algorithm. In each 
iteration, a vertex:&lt;/p&gt;
 
 &lt;ol&gt;
-  &lt;li&gt;&lt;strong&gt;Gather&lt;/strong&gt; retrieves distances from its 
neighbors summed up with the corresponding edge values; &lt;/li&gt;
+  &lt;li&gt;&lt;strong&gt;Gather&lt;/strong&gt; retrieves distances from its 
neighbors summed up with the corresponding edge values;&lt;/li&gt;
   &lt;li&gt;&lt;strong&gt;Sum&lt;/strong&gt; compares the newly obtained 
distances in order to extract the minimum;&lt;/li&gt;
   &lt;li&gt;&lt;strong&gt;Apply&lt;/strong&gt; and finally adopts the minimum 
distance computed in the sum step,
 provided that it is lower than its current value. If a vertex’s value does 
not change during
@@ -2475,7 +2488,7 @@ plays that each song has. We then filter out the list of 
songs the users do not
 playlist. Then we compute the top songs per user (i.e. the songs a user 
listened to the most).
 Finally, as a separate use-case on the same data set, we create a user-user 
similarity graph based
 on the common songs and use this resulting graph to detect communities by 
calling Gelly’s Label Propagation
-library method. &lt;/p&gt;
+library method.&lt;/p&gt;
 
 &lt;p&gt;For running the example implementation, please use the 0.10-SNAPSHOT 
version of Flink as a
 dependency. The full example code base can be found &lt;a 
href=&quot;https://github.com/apache/flink/blob/master/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java&quot;&gt;here&lt;/a&gt;.
 The public data set used for testing
@@ -2497,7 +2510,7 @@ playlist, we use a coGroup function to filter out the 
mismatches.&lt;/p&gt;
 &lt;span class=&quot;c1&quot;&gt;// read the mismatches dataset and extract 
the songIDs&lt;/span&gt;
 &lt;span class=&quot;n&quot;&gt;DataSet&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;Tuple3&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;String&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;String&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;Integer&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;&amp;gt;&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;validTriplets&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;triplets&lt;/span&gt;
         &lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;coGroup&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;mismatches&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;).&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;where&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;mi&quot;&gt;1&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;).&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;equalTo&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;mi&quot;&gt;0&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)&lt;/span&gt;
-        &lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;with&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;CoGroupFunction&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;()&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;{&lt;/span&gt;
+        &lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;with&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;CoGroupFunction&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;()&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;{&lt;/span&gt;
                 &lt;span class=&quot;kt&quot;&gt;void&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;coGroup&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;Iterable&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;triplets&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;Iterable&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;invalidSongs&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;Collector&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;out&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;{&lt;/span&gt;
                         &lt;span class=&quot;k&quot;&gt;if&lt;/span&gt; 
&lt;span class=&quot;o&quot;&gt;(!&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;invalidSongs&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;iterator&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;().&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;hasNext&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;())&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;{&lt;/span&gt;
                             &lt;span class=&quot;k&quot;&gt;for&lt;/span&gt; 
&lt;span class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;Tuple3&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;triplet&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;:&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;triplets&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;{&lt;/span&gt; &lt;span class=&quot;c1&quot;&gt;// valid 
triplet&lt;/span&gt;
@@ -2535,7 +2548,7 @@ basically iterate through the edge value and collect the 
target (song) of the ma
 
 &lt;div class=&quot;highlight&quot;&gt;&lt;pre&gt;&lt;code 
class=&quot;language-java&quot;&gt;&lt;span class=&quot;c1&quot;&gt;//get the 
top track (most listened to) for each user&lt;/span&gt;
 &lt;span class=&quot;n&quot;&gt;DataSet&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;Tuple2&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;usersWithTopTrack&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;userSongGraph&lt;/span&gt;
-        &lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;groupReduceOnEdges&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;GetTopSongPerUser&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(),&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;EdgeDirection&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;OUT&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;);&lt;/span&gt;
+        &lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;groupReduceOnEdges&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;GetTopSongPerUser&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(),&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;EdgeDirection&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;OUT&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;);&lt;/span&gt;
 
 &lt;span class=&quot;kd&quot;&gt;class&lt;/span&gt; &lt;span 
class=&quot;nc&quot;&gt;GetTopSongPerUser&lt;/span&gt; &lt;span 
class=&quot;kd&quot;&gt;implements&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;EdgesFunctionWithVertexValue&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;{&lt;/span&gt;
     &lt;span class=&quot;kt&quot;&gt;void&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;iterateEdges&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;Vertex&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;vertex&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;Iterable&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;Edge&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;edges&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;{&lt;/span&gt;
@@ -2548,7 +2561,7 @@ basically iterate through the edge value and collect the 
target (song) of the ma
                 &lt;span class=&quot;n&quot;&gt;topSong&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;edge&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;getTarget&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;();&lt;/span&gt;
             &lt;span class=&quot;o&quot;&gt;}&lt;/span&gt;
         &lt;span class=&quot;o&quot;&gt;}&lt;/span&gt;
-        &lt;span class=&quot;k&quot;&gt;return&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;Tuple2&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;vertex&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;getId&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(),&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;topSong&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;);&lt;/span&gt;
+        &lt;span class=&quot;k&quot;&gt;return&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;Tuple2&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;vertex&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;getId&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(),&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;topSong&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;);&lt;/span&gt;
     &lt;span class=&quot;o&quot;&gt;}&lt;/span&gt;
 &lt;span 
class=&quot;o&quot;&gt;}&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;
 
@@ -2565,10 +2578,10 @@ in the figure below.&lt;/p&gt;
 
 &lt;p&gt;To form the user-user graph in Flink, we will simply take the edges 
from the user-song graph
 (left-hand side of the image), group them by song-id, and then add all the 
users (source vertex ids)
-to an ArrayList. &lt;/p&gt;
+to an ArrayList.&lt;/p&gt;
 
 &lt;p&gt;We then match users who listened to the same song two by two, 
creating a new edge to mark their
-common interest (right-hand side of the image). &lt;/p&gt;
+common interest (right-hand side of the image).&lt;/p&gt;
 
 &lt;p&gt;Afterwards, we perform a &lt;code&gt;distinct()&lt;/code&gt; 
operation to avoid creation of duplicate data.
 Considering that we now have the DataSet of edges which present interest, 
creating a graph is as
@@ -2584,14 +2597,14 @@ straightforward as a call to the 
&lt;code&gt;Graph.fromDataSet()&lt;/code&gt; me
                 &lt;span class=&quot;o&quot;&gt;}&lt;/span&gt;
         &lt;span class=&quot;o&quot;&gt;})&lt;/span&gt;
         &lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;groupBy&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;mi&quot;&gt;1&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)&lt;/span&gt;
-        &lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;reduceGroup&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;GroupReduceFunction&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;()&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;{&lt;/span&gt;
+        &lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;reduceGroup&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;GroupReduceFunction&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;()&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;{&lt;/span&gt;
                 &lt;span class=&quot;kt&quot;&gt;void&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;reduce&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;Iterable&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;Edge&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;edges&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;Collector&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;Edge&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;out&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;{&lt;/span&gt;
-                    &lt;span class=&quot;n&quot;&gt;List&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;users&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;ArrayList&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;();&lt;/span&gt;
+                    &lt;span class=&quot;n&quot;&gt;List&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;users&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;ArrayList&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;();&lt;/span&gt;
                     &lt;span class=&quot;k&quot;&gt;for&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;Edge&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;edge&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;:&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;edges&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;{&lt;/span&gt;
                         &lt;span 
class=&quot;n&quot;&gt;users&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;add&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;edge&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;getSource&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;());&lt;/span&gt;
                         &lt;span class=&quot;k&quot;&gt;for&lt;/span&gt; 
&lt;span class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;kt&quot;&gt;int&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;i&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;mi&quot;&gt;0&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;;&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;i&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;users&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;size&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;()&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;-&lt;/span&gt; &lt;span 
class=&quot;mi&quot;&gt;1&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;;&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;i&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;++)&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;{&lt;/span&gt;
                             &lt;span class=&quot;k&quot;&gt;for&lt;/span&gt; 
&lt;span class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;kt&quot;&gt;int&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;j&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;i&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;+&lt;/span&gt;&lt;span 
class=&quot;mi&quot;&gt;1&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;;&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;j&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;users&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;size&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;()&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;-&lt;/span&gt; &lt;span 
class=&quot;mi&quot;&gt;1&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;;&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;j&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;++)&lt;/span&gt; &lt;span clas
 s=&quot;o&quot;&gt;{&lt;/span&gt;
-                                &lt;span 
class=&quot;n&quot;&gt;out&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;collect&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;Edge&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;users&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;get&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;i&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;),&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;users&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;get&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;j&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)));&lt;/span&gt;
+                                &lt;span 
class=&quot;n&quot;&gt;out&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;collect&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;Edge&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;users&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;get&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;i&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;),&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;users&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;get&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;j&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)));&lt;/span&gt;
                             &lt;span class=&quot;o&quot;&gt;}&lt;/span&gt;
                         &lt;span class=&quot;o&quot;&gt;}&lt;/span&gt;
                     &lt;span class=&quot;o&quot;&gt;}&lt;/span&gt;
@@ -2607,7 +2620,7 @@ formed. To do so, we first initialize each vertex with a 
numeric label using the
 the id of a vertex with the first element of the tuple, afterwards applying a 
map function.
 Finally, we call the &lt;code&gt;run()&lt;/code&gt; method with the 
LabelPropagation library method passed
 as a parameter. In the end, the vertices will be updated to contain the most 
frequent label
-among their neighbors. &lt;/p&gt;
+among their neighbors.&lt;/p&gt;
 
 &lt;div class=&quot;highlight&quot;&gt;&lt;pre&gt;&lt;code 
class=&quot;language-java&quot;&gt;&lt;span class=&quot;c1&quot;&gt;// detect 
user communities using label propagation&lt;/span&gt;
 &lt;span class=&quot;c1&quot;&gt;// initialize each vertex with a unique 
numeric label&lt;/span&gt;
@@ -2622,12 +2635,12 @@ among their neighbors. &lt;/p&gt;
 
 &lt;span class=&quot;c1&quot;&gt;// update the vertex values and run the label 
propagation algorithm&lt;/span&gt;
 &lt;span class=&quot;n&quot;&gt;DataSet&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;Vertex&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;verticesWithCommunity&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;similarUsersGraph&lt;/span&gt;
-        &lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;joinWithVertices&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;idsWithlLabels&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;MapFunction&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;()&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;{&lt;/span&gt;
+        &lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;joinWithVertices&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;idsWithlLabels&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;MapFunction&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;()&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;{&lt;/span&gt;
                 &lt;span class=&quot;kd&quot;&gt;public&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;Long&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;map&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;Tuple2&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;idWithLabel&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;{&lt;/span&gt;
                     &lt;span class=&quot;k&quot;&gt;return&lt;/span&gt; 
&lt;span class=&quot;n&quot;&gt;idWithLabel&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;f1&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;;&lt;/span&gt;
                 &lt;span class=&quot;o&quot;&gt;}&lt;/span&gt;
         &lt;span class=&quot;o&quot;&gt;})&lt;/span&gt;
-        &lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;run&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;LabelPropagation&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;numIterations&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;))&lt;/span&gt;
+        &lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;run&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;LabelPropagation&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;numIterations&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;))&lt;/span&gt;
         &lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;getVertices&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;();&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;
 
 &lt;p&gt;&lt;a href=&quot;#top&quot;&gt;Back to top&lt;/a&gt;&lt;/p&gt;
@@ -2637,10 +2650,10 @@ among their neighbors. &lt;/p&gt;
 &lt;p&gt;Currently, Gelly matches the basic functionalities provided by most 
state-of-the-art graph
 processing systems. Our vision is to turn Gelly into more than “yet another 
library for running
 PageRank-like algorithms” by supporting generic iterations, implementing 
graph partitioning,
-providing bipartite graph support and by offering numerous other features. 
&lt;/p&gt;
+providing bipartite graph support and by offering numerous other 
features.&lt;/p&gt;
 
 &lt;p&gt;We are also enriching Flink Gelly with a set of operators suitable 
for highly skewed graphs
-as well as a Graph API built on Flink Streaming. &lt;/p&gt;
+as well as a Graph API built on Flink Streaming.&lt;/p&gt;
 
 &lt;p&gt;In the near future, we would like to see how Gelly can be integrated 
with graph visualization
 tools, graph database systems and sampling techniques.&lt;/p&gt;
@@ -2898,7 +2911,7 @@ tools, graph database systems and sampling 
techniques.&lt;/p&gt;
 
 <item>
 <title>April 2015 in the Flink community</title>
-<description>&lt;p&gt;April was an packed month for Apache Flink. &lt;/p&gt;
+<description>&lt;p&gt;April was an packed month for Apache Flink.&lt;/p&gt;
 
 &lt;h3 id=&quot;flink-runner-for-google-cloud-dataflow&quot;&gt;Flink runner 
for Google Cloud Dataflow&lt;/h3&gt;
 
@@ -2924,7 +2937,7 @@ including Apache Flink.&lt;/p&gt;
 
 &lt;h2 id=&quot;flink-on-the-web&quot;&gt;Flink on the web&lt;/h2&gt;
 
-&lt;p&gt;Fabian Hueske gave an &lt;a 
href=&quot;http://www.infoq.com/news/2015/04/hueske-apache-flink?utm_campaign=infoq_content&amp;amp;utm_source=infoq&amp;amp;utm_medium=feed&amp;amp;utm_term=global&quot;&gt;interview
 at InfoQ&lt;/a&gt; on Apache Flink. &lt;/p&gt;
+&lt;p&gt;Fabian Hueske gave an &lt;a 
href=&quot;http://www.infoq.com/news/2015/04/hueske-apache-flink?utm_campaign=infoq_content&amp;amp;utm_source=infoq&amp;amp;utm_medium=feed&amp;amp;utm_term=global&quot;&gt;interview
 at InfoQ&lt;/a&gt; on Apache Flink.&lt;/p&gt;
 
 &lt;h2 id=&quot;upcoming-events&quot;&gt;Upcoming events&lt;/h2&gt;
 
@@ -2956,7 +2969,7 @@ However, this approach has a few notable drawbacks. First 
of all it is not trivi
 &lt;img src=&quot;/img/blog/memory-mgmt.png&quot; 
style=&quot;width:90%;margin:15px&quot; /&gt;
 &lt;/center&gt;
 
-&lt;p&gt;Flink’s style of active memory management and operating on binary 
data has several benefits: &lt;/p&gt;
+&lt;p&gt;Flink’s style of active memory management and operating on binary 
data has several benefits:&lt;/p&gt;
 
 &lt;ol&gt;
   &lt;li&gt;&lt;strong&gt;Memory-safe execution &amp;amp; efficient 
out-of-core algorithms.&lt;/strong&gt; Due to the fixed amount of allocated 
memory segments, it is trivial to monitor remaining memory resources. In case 
of memory shortage, processing operators can efficiently write larger batches 
of memory segments to disk and later them read back. Consequently, 
&lt;code&gt;OutOfMemoryErrors&lt;/code&gt; are effectively prevented.&lt;/li&gt;
@@ -2965,13 +2978,13 @@ However, this approach has a few notable drawbacks. 
First of all it is not trivi
   &lt;li&gt;&lt;strong&gt;Efficient binary operations &amp;amp; cache 
sensitivity.&lt;/strong&gt; Binary data can be efficiently compared and 
operated on given a suitable binary representation. Furthermore, the binary 
representations can put related values, as well as hash codes, keys, and 
pointers, adjacently into memory. This gives data structures with usually more 
cache efficient access patterns.&lt;/li&gt;
 &lt;/ol&gt;
 
-&lt;p&gt;These properties of active memory management are very desirable in a 
data processing systems for large-scale data analytics but have a significant 
price tag attached. Active memory management and operating on binary data is 
not trivial to implement, i.e., using 
&lt;code&gt;java.util.HashMap&lt;/code&gt; is much easier than implementing a 
spillable hash-table backed by byte arrays and a custom serialization stack. Of 
course Apache Flink is not the only JVM-based data processing system that 
operates on serialized binary data. Projects such as &lt;a 
href=&quot;http://drill.apache.org/&quot;&gt;Apache Drill&lt;/a&gt;, &lt;a 
href=&quot;http://ignite.incubator.apache.org/&quot;&gt;Apache Ignite 
(incubating)&lt;/a&gt; or &lt;a 
href=&quot;http://projectgeode.org/&quot;&gt;Apache Geode 
(incubating)&lt;/a&gt; apply similar techniques and it was recently announced 
that also &lt;a href=&quot;http://spark.apache.org/&quot;&gt;Apache 
Spark&lt;/a&gt; will evolve into this direction with &
 lt;a 
href=&quot;https://databricks.com/blog/2015/04/28/project-tungsten-bringing-spark-closer-to-bare-metal.html&quot;&gt;Project
 Tungsten&lt;/a&gt;. &lt;/p&gt;
+&lt;p&gt;These properties of active memory management are very desirable in a 
data processing systems for large-scale data analytics but have a significant 
price tag attached. Active memory management and operating on binary data is 
not trivial to implement, i.e., using 
&lt;code&gt;java.util.HashMap&lt;/code&gt; is much easier than implementing a 
spillable hash-table backed by byte arrays and a custom serialization stack. Of 
course Apache Flink is not the only JVM-based data processing system that 
operates on serialized binary data. Projects such as &lt;a 
href=&quot;http://drill.apache.org/&quot;&gt;Apache Drill&lt;/a&gt;, &lt;a 
href=&quot;http://ignite.incubator.apache.org/&quot;&gt;Apache Ignite 
(incubating)&lt;/a&gt; or &lt;a 
href=&quot;http://projectgeode.org/&quot;&gt;Apache Geode 
(incubating)&lt;/a&gt; apply similar techniques and it was recently announced 
that also &lt;a href=&quot;http://spark.apache.org/&quot;&gt;Apache 
Spark&lt;/a&gt; will evolve into this direction with &
 lt;a 
href=&quot;https://databricks.com/blog/2015/04/28/project-tungsten-bringing-spark-closer-to-bare-metal.html&quot;&gt;Project
 Tungsten&lt;/a&gt;.&lt;/p&gt;
 
 &lt;p&gt;In the following we discuss in detail how Flink allocates memory, 
de/serializes objects, and operates on binary data. We will also show some 
performance numbers comparing processing objects on the heap and operating on 
binary data.&lt;/p&gt;
 
 &lt;h2 id=&quot;how-does-flink-allocate-memory&quot;&gt;How does Flink 
allocate memory?&lt;/h2&gt;
 
-&lt;p&gt;A Flink worker, called TaskManager, is composed of several internal 
components such as an actor system for coordination with the Flink master, an 
IOManager that takes care of spilling data to disk and reading it back, and a 
MemoryManager that coordinates memory usage. In the context of this blog post, 
the MemoryManager is of most interest. &lt;/p&gt;
+&lt;p&gt;A Flink worker, called TaskManager, is composed of several internal 
components such as an actor system for coordination with the Flink master, an 
IOManager that takes care of spilling data to disk and reading it back, and a 
MemoryManager that coordinates memory usage. In the context of this blog post, 
the MemoryManager is of most interest.&lt;/p&gt;
 
 &lt;p&gt;The MemoryManager takes care of allocating, accounting, and 
distributing MemorySegments to data processing operators such as sort and join 
operators. A &lt;a 
href=&quot;https://github.com/apache/flink/blob/release-0.9.0-milestone-1/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegment.java&quot;&gt;MemorySegment&lt;/a&gt;
 is Flink’s distribution unit of memory and is backed by a regular Java byte 
array (size is 32 KB by default). A MemorySegment provides very efficient write 
and read access to its backed byte array using Java’s unsafe methods. You can 
think of a MemorySegment as a custom-tailored version of Java’s NIO 
ByteBuffer. In order to operate on multiple MemorySegments like on a larger 
chunk of consecutive memory, Flink uses logical views that implement Java’s 
&lt;code&gt;java.io.DataOutput&lt;/code&gt; and 
&lt;code&gt;java.io.DataInput&lt;/code&gt; interfaces.&lt;/p&gt;
 
@@ -2983,7 +2996,7 @@ However, this approach has a few notable drawbacks. First 
of all it is not trivi
 
 &lt;h2 id=&quot;how-does-flink-serialize-objects&quot;&gt;How does Flink 
serialize objects?&lt;/h2&gt;
 
-&lt;p&gt;The Java ecosystem offers several libraries to convert objects into a 
binary representation and back. Common alternatives are standard Java 
serialization, &lt;a 
href=&quot;https://github.com/EsotericSoftware/kryo&quot;&gt;Kryo&lt;/a&gt;, 
&lt;a href=&quot;http://avro.apache.org/&quot;&gt;Apache Avro&lt;/a&gt;, &lt;a 
href=&quot;http://thrift.apache.org/&quot;&gt;Apache Thrift&lt;/a&gt;, or 
Google’s &lt;a 
href=&quot;https://github.com/google/protobuf&quot;&gt;Protobuf&lt;/a&gt;. 
Flink includes its own custom serialization framework in order to control the 
binary representation of data. This is important because operating on binary 
data such as comparing or even manipulating binary data requires exact 
knowledge of the serialization layout. Further, configuring the serialization 
layout with respect to operations that are performed on binary data can yield a 
significant performance boost. Flink’s serialization stack also leverages the 
fact, that the type of the objects which 
 are going through de/serialization are exactly known before a program is 
executed. &lt;/p&gt;
+&lt;p&gt;The Java ecosystem offers several libraries to convert objects into a 
binary representation and back. Common alternatives are standard Java 
serialization, &lt;a 
href=&quot;https://github.com/EsotericSoftware/kryo&quot;&gt;Kryo&lt;/a&gt;, 
&lt;a href=&quot;http://avro.apache.org/&quot;&gt;Apache Avro&lt;/a&gt;, &lt;a 
href=&quot;http://thrift.apache.org/&quot;&gt;Apache Thrift&lt;/a&gt;, or 
Google’s &lt;a 
href=&quot;https://github.com/google/protobuf&quot;&gt;Protobuf&lt;/a&gt;. 
Flink includes its own custom serialization framework in order to control the 
binary representation of data. This is important because operating on binary 
data such as comparing or even manipulating binary data requires exact 
knowledge of the serialization layout. Further, configuring the serialization 
layout with respect to operations that are performed on binary data can yield a 
significant performance boost. Flink’s serialization stack also leverages the 
fact, that the type of the objects which 
 are going through de/serialization are exactly known before a program is 
executed.&lt;/p&gt;
 
 &lt;p&gt;Flink programs can process data represented as arbitrary Java or 
Scala objects. Before a program is optimized, the data types at each processing 
step of the program’s data flow need to be identified. For Java programs, 
Flink features a reflection-based type extraction component to analyze the 
return types of user-defined functions. Scala programs are analyzed with help 
of the Scala compiler. Flink represents each data type with a &lt;a 
href=&quot;https://github.com/apache/flink/blob/release-0.9.0-milestone-1/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java&quot;&gt;TypeInformation&lt;/a&gt;.
 Flink has TypeInformations for several kinds of data types, 
including:&lt;/p&gt;
 
@@ -2993,11 +3006,11 @@ However, this approach has a few notable drawbacks. 
First of all it is not trivi
   &lt;li&gt;WritableTypeInfo: Any implementation of Hadoop’s Writable 
interface.&lt;/li&gt;
   &lt;li&gt;TupleTypeInfo: Any Flink tuple (Tuple1 to Tuple25). Flink tuples 
are Java representations for fixed-length tuples with typed fields.&lt;/li&gt;
   &lt;li&gt;CaseClassTypeInfo: Any Scala CaseClass (including Scala 
tuples).&lt;/li&gt;
-  &lt;li&gt;PojoTypeInfo: Any POJO (Java or Scala), i.e., an object with all 
fields either being public or accessible through getters and setter that follow 
the common naming conventions. &lt;/li&gt;
+  &lt;li&gt;PojoTypeInfo: Any POJO (Java or Scala), i.e., an object with all 
fields either being public or accessible through getters and setter that follow 
the common naming conventions.&lt;/li&gt;
   &lt;li&gt;GenericTypeInfo: Any data type that cannot be identified as 
another type.&lt;/li&gt;
 &lt;/ul&gt;
 
-&lt;p&gt;Each TypeInformation provides a serializer for the data type it 
represents. For example, a BasicTypeInfo returns a serializer that writes the 
respective primitive type, the serializer of a WritableTypeInfo delegates 
de/serialization to the write() and readFields() methods of the object 
implementing Hadoop’s Writable interface, and a GenericTypeInfo returns a 
serializer that delegates serialization to Kryo. Object serialization to a 
DataOutput which is backed by Flink MemorySegments goes automatically through 
Java’s efficient unsafe operations. For data types that can be used as keys, 
i.e., compared and hashed, the TypeInformation provides TypeComparators. 
TypeComparators compare and hash objects and can - depending on the concrete 
data type - also efficiently compare binary representations and extract 
fixed-length binary key prefixes. &lt;/p&gt;
+&lt;p&gt;Each TypeInformation provides a serializer for the data type it 
represents. For example, a BasicTypeInfo returns a serializer that writes the 
respective primitive type, the serializer of a WritableTypeInfo delegates 
de/serialization to the write() and readFields() methods of the object 
implementing Hadoop’s Writable interface, and a GenericTypeInfo returns a 
serializer that delegates serialization to Kryo. Object serialization to a 
DataOutput which is backed by Flink MemorySegments goes automatically through 
Java’s efficient unsafe operations. For data types that can be used as keys, 
i.e., compared and hashed, the TypeInformation provides TypeComparators. 
TypeComparators compare and hash objects and can - depending on the concrete 
data type - also efficiently compare binary representations and extract 
fixed-length binary key prefixes.&lt;/p&gt;
 
 &lt;p&gt;Tuple, Pojo, and CaseClass types are composite types, i.e., 
containers for one or more possibly nested data types. As such, their 
serializers and comparators are also composite and delegate the serialization 
and comparison of their member data types to the respective serializers and 
comparators. The following figure illustrates the serialization of a (nested) 
&lt;code&gt;Tuple3&amp;lt;Integer, Double, Person&amp;gt;&lt;/code&gt; object 
where &lt;code&gt;Person&lt;/code&gt; is a POJO and defined as 
follows:&lt;/p&gt;
 
@@ -3010,13 +3023,13 @@ However, this approach has a few notable drawbacks. 
First of all it is not trivi
 &lt;img src=&quot;/img/blog/data-serialization.png&quot; 
style=&quot;width:80%;margin:15px&quot; /&gt;
 &lt;/center&gt;
 
-&lt;p&gt;Flink’s type system can be easily extended by providing custom 
TypeInformations, Serializers, and Comparators to improve the performance of 
serializing and comparing custom data types. &lt;/p&gt;
+&lt;p&gt;Flink’s type system can be easily extended by providing custom 
TypeInformations, Serializers, and Comparators to improve the performance of 
serializing and comparing custom data types.&lt;/p&gt;
 
 &lt;h2 id=&quot;how-does-flink-operate-on-binary-data&quot;&gt;How does Flink 
operate on binary data?&lt;/h2&gt;
 
 &lt;p&gt;Similar to many other data processing APIs (including SQL), Flink’s 
APIs provide transformations to group, sort, and join data sets. These 
transformations operate on potentially very large data sets. Relational 
database systems feature very efficient algorithms for these purposes since 
several decades including external merge-sort, merge-join, and hybrid 
hash-join. Flink builds on this technology, but generalizes it to handle 
arbitrary objects using its custom serialization and comparison stack. In the 
following, we show how Flink operates with binary data by the example of 
Flink’s in-memory sort algorithm.&lt;/p&gt;
 
-&lt;p&gt;Flink assigns a memory budget to its data processing operators. Upon 
initialization, a sort algorithm requests its memory budget from the 
MemoryManager and receives a corresponding set of MemorySegments. The set of 
MemorySegments becomes the memory pool of a so-called sort buffer which 
collects the data that is be sorted. The following figure illustrates how data 
objects are serialized into the sort buffer. &lt;/p&gt;
+&lt;p&gt;Flink assigns a memory budget to its data processing operators. Upon 
initialization, a sort algorithm requests its memory budget from the 
MemoryManager and receives a corresponding set of MemorySegments. The set of 
MemorySegments becomes the memory pool of a so-called sort buffer which 
collects the data that is be sorted. The following figure illustrates how data 
objects are serialized into the sort buffer.&lt;/p&gt;
 
 &lt;center&gt;
 &lt;img src=&quot;/img/blog/sorting-binary-data-1.png&quot; 
style=&quot;width:90%;margin:15px&quot; /&gt;
@@ -3029,7 +3042,7 @@ The following figure shows how two objects are 
compared.&lt;/p&gt;
 &lt;img src=&quot;/img/blog/sorting-binary-data-2.png&quot; 
style=&quot;width:80%;margin:15px&quot; /&gt;
 &lt;/center&gt;
 
-&lt;p&gt;The sort buffer compares two elements by comparing their binary 
fix-length sort keys. The comparison is successful if either done on a full key 
(not a prefix key) or if the binary prefix keys are not equal. If the prefix 
keys are equal (or the sort key data type does not provide a binary prefix 
key), the sort buffer follows the pointers to the actual object data, 
deserializes both objects and compares the objects. Depending on the result of 
the comparison, the sort algorithm decides whether to swap the compared 
elements or not. The sort buffer swaps two elements by moving their fix-length 
keys and pointers. The actual data is not moved. Once the sort algorithm 
finishes, the pointers in the sort buffer are correctly ordered. The following 
figure shows how the sorted data is returned from the sort buffer. &lt;/p&gt;
+&lt;p&gt;The sort buffer compares two elements by comparing their binary 
fix-length sort keys. The comparison is successful if either done on a full key 
(not a prefix key) or if the binary prefix keys are not equal. If the prefix 
keys are equal (or the sort key data type does not provide a binary prefix 
key), the sort buffer follows the pointers to the actual object data, 
deserializes both objects and compares the objects. Depending on the result of 
the comparison, the sort algorithm decides whether to swap the compared 
elements or not. The sort buffer swaps two elements by moving their fix-length 
keys and pointers. The actual data is not moved. Once the sort algorithm 
finishes, the pointers in the sort buffer are correctly ordered. The following 
figure shows how the sorted data is returned from the sort buffer.&lt;/p&gt;
 
 &lt;center&gt;
 &lt;img src=&quot;/img/blog/sorting-binary-data-3.png&quot; 
style=&quot;width:80%;margin:15px&quot; /&gt;
@@ -3047,7 +3060,7 @@ The following figure shows how two objects are 
compared.&lt;/p&gt;
   &lt;li&gt;&lt;strong&gt;Kryo-serialized.&lt;/strong&gt; The tuple fields are 
serialized into a sort buffer of 600 MB size using Kryo serialization and 
sorted without binary sort keys. This means that each pair-wise comparison 
requires two object to be deserialized.&lt;/li&gt;
 &lt;/ol&gt;
 
-&lt;p&gt;All sort methods are implemented using a single thread. The reported 
times are averaged over ten runs. After each run, we call 
&lt;code&gt;System.gc()&lt;/code&gt; to request a garbage collection run which 
does not go into measured execution time. The following figure shows the time 
to store the input data in memory, sort it, and read it back as objects. 
&lt;/p&gt;
+&lt;p&gt;All sort methods are implemented using a single thread. The reported 
times are averaged over ten runs. After each run, we call 
&lt;code&gt;System.gc()&lt;/code&gt; to request a garbage collection run which 
does not go into measured execution time. The following figure shows the time 
to store the input data in memory, sort it, and read it back as 
objects.&lt;/p&gt;
 
 &lt;center&gt;
 &lt;img src=&quot;/img/blog/sort-benchmark.png&quot; 
style=&quot;width:90%;margin:15px&quot; /&gt;
@@ -3105,13 +3118,13 @@ The following figure shows how two objects are 
compared.&lt;/p&gt;
 
 &lt;p&gt;&lt;br /&gt;&lt;/p&gt;
 
-&lt;p&gt;To summarize, the experiments verify the previously stated benefits 
of operating on binary data. &lt;/p&gt;
+&lt;p&gt;To summarize, the experiments verify the previously stated benefits 
of operating on binary data.&lt;/p&gt;
 
 &lt;h2 id=&quot;were-not-done-yet&quot;&gt;We’re not done yet!&lt;/h2&gt;
 
-&lt;p&gt;Apache Flink features quite a bit of advanced techniques to safely 
and efficiently process huge amounts of data with limited memory resources. 
However, there are a few points that could make Flink even more efficient. The 
Flink community is working on moving the managed memory to off-heap memory. 
This will allow for smaller JVMs, lower garbage collection overhead, and also 
easier system configuration. With Flink’s Table API, the semantics of all 
operations such as aggregations and projections are known (in contrast to 
black-box user-defined functions). Hence we can generate code for Table API 
operations that directly operates on binary data. Further improvements include 
serialization layouts which are tailored towards the operations that are 
applied on the binary data and code generation for serializers and comparators. 
&lt;/p&gt;
+&lt;p&gt;Apache Flink features quite a bit of advanced techniques to safely 
and efficiently process huge amounts of data with limited memory resources. 
However, there are a few points that could make Flink even more efficient. The 
Flink community is working on moving the managed memory to off-heap memory. 
This will allow for smaller JVMs, 

<TRUNCATED>

Reply via email to