http://git-wip-us.apache.org/repos/asf/flink-web/blob/a16dddeb/content/blog/feed.xml
----------------------------------------------------------------------
diff --git a/content/blog/feed.xml b/content/blog/feed.xml
new file mode 100644
index 0000000..d3374f9
--- /dev/null
+++ b/content/blog/feed.xml
@@ -0,0 +1,2553 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom";>
+<channel>
+<title>Flink Blog Feed</title>
+<description>Flink Blog</description>
+<link>http://flink.apache.org/blog</link>
+<atom:link href="http://flink.apache.org/blog/feed.xml"; rel="self" 
type="application/rss+xml" />
+
+<item>
+<title>Announcing Flink 0.9.0-milestone1 preview release</title>
+<description>&lt;p&gt;The Apache Flink community is pleased to announce the 
availability of
+the 0.9.0-milestone-1 release. The release is a preview of the
+upcoming 0.9.0 release. It contains many new features which will be
+available in the upcoming 0.9 release. Interested users are encouraged
+to try it out and give feedback. As the version number indicates, this
+release is a preview release that contains known issues.&lt;/p&gt;
+
+&lt;p&gt;You can download the release
+&lt;a 
href=&quot;http://flink.apache.org/downloads.html#preview&quot;&gt;here&lt;/a&gt;
 and check out the
+latest documentation
+&lt;a 
href=&quot;http://ci.apache.org/projects/flink/flink-docs-master/&quot;&gt;here&lt;/a&gt;.
 Feedback
+through the Flink &lt;a 
href=&quot;http://flink.apache.org/community.html#mailing-lists&quot;&gt;mailing
+lists&lt;/a&gt; is, as
+always, very welcome!&lt;/p&gt;
+
+&lt;h2 id=&quot;new-features&quot;&gt;New Features&lt;/h2&gt;
+
+&lt;h3 id=&quot;table-api&quot;&gt;Table API&lt;/h3&gt;
+
+&lt;p&gt;Flink’s new Table API offers a higher-level abstraction for
+interacting with structured data sources. The Table API allows users
+to execute logical, SQL-like queries on distributed data sets while
+allowing them to freely mix declarative queries with regular Flink
+operators. Here is an example that groups and joins two tables:&lt;/p&gt;
+&lt;div class=&quot;highlight&quot;&gt;&lt;pre&gt;&lt;code 
class=&quot;language-scala&quot; data-lang=&quot;scala&quot;&gt;&lt;span 
class=&quot;k&quot;&gt;val&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;clickCounts&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;clicks&lt;/span&gt;
+  &lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;groupBy&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;-Symbol&quot;&gt;&amp;#39;user&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;).&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;select&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;-Symbol&quot;&gt;&amp;#39;userId&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;-Symbol&quot;&gt;&amp;#39;url&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;count&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;as&lt;/span&gt; &lt;span 
class=&quot;-Symbol&quot;&gt;&amp;#39;count&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)&lt;/span&gt;
+
+&lt;span class=&quot;k&quot;&gt;val&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;activeUsers&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;users&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;join&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;clickCounts&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)&lt;/span&gt;
+  &lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;where&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;-Symbol&quot;&gt;&amp;#39;id&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;===&lt;/span&gt; &lt;span 
class=&quot;-Symbol&quot;&gt;&amp;#39;userId&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;&amp;amp;&amp;amp;&lt;/span&gt; &lt;span 
class=&quot;-Symbol&quot;&gt;&amp;#39;count&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;&amp;gt;&lt;/span&gt; &lt;span 
class=&quot;mi&quot;&gt;10&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;).&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;select&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;-Symbol&quot;&gt;&amp;#39;username&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;-Symbol&quot;&gt;&amp;#39;count&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;...)&lt;/span&gt;
+&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;
+&lt;p&gt;Tables consist of logical attributes that can be selected by name
+rather than physical Java and Scala data types. This alleviates a lot
+of boilerplate code for common ETL tasks and raises the abstraction
+for Flink programs. Tables are available for both static and streaming
+data sources (DataSet and DataStream APIs).&lt;/p&gt;
+
+&lt;p&gt;Check out the Table guide for Java and Scala
+&lt;a 
href=&quot;http://ci.apache.org/projects/flink/flink-docs-master/table.html&quot;&gt;here&lt;/a&gt;.&lt;/p&gt;
+
+&lt;h3 id=&quot;gelly-graph-processing-api&quot;&gt;Gelly Graph Processing 
API&lt;/h3&gt;
+
+&lt;p&gt;Gelly is a Java Graph API for Flink. It contains a set of utilities
+for graph analysis, support for iterative graph processing and a
+library of graph algorithms. Gelly exposes a Graph data structure that
+wraps DataSets for vertices and edges, as well as methods for creating
+graphs from DataSets, graph transformations and utilities (e.g., in-
+and out- degrees of vertices), neighborhood aggregations, iterative
+vertex-centric graph processing, as well as a library of common graph
+algorithms, including PageRank, SSSP, label propagation, and community
+detection.&lt;/p&gt;
+
+&lt;p&gt;Gelly internally builds on top of Flink’s &lt;a 
href=&quot;http://ci.apache.org/projects/flink/flink-docs-master/iterations.html&quot;&gt;delta
+iterations&lt;/a&gt;. Iterative
+graph algorithms are executed leveraging mutable state, achieving
+similar performance with specialized graph processing systems.&lt;/p&gt;
+
+&lt;p&gt;Gelly will eventually subsume Spargel, Flink’s Pregel-like API. 
Check
+out the Gelly guide
+&lt;a 
href=&quot;http://ci.apache.org/projects/flink/flink-docs-master/gelly_guide.html&quot;&gt;here&lt;/a&gt;.&lt;/p&gt;
+
+&lt;h3 id=&quot;flink-machine-learning-library&quot;&gt;Flink Machine Learning 
Library&lt;/h3&gt;
+
+&lt;p&gt;This release includes the first version of Flink’s Machine Learning
+library. The library’s pipeline approach, which has been strongly
+inspired by scikit-learn’s abstraction of transformers and estimators,
+makes it easy to quickly set up a data processing pipeline and to get
+your job done.&lt;/p&gt;
+
+&lt;p&gt;Flink distinguishes between transformers and learners. Transformers
+are components which transform your input data into a new format
+allowing you to extract features, cleanse your data or to sample from
+it. Learners on the other hand constitute the components which take
+your input data and train a model on it. The model you obtain from the
+learner can then be evaluated and used to make predictions on unseen
+data.&lt;/p&gt;
+
+&lt;p&gt;Currently, the machine learning library contains transformers and
+learners to do multiple tasks. The library supports multiple linear
+regression using a stochastic gradient implementation to scale to
+large data sizes. Furthermore, it includes an alternating least
+squares (ALS) implementation to factorizes large matrices. The matrix
+factorization can be used to do collaborative filtering. An
+implementation of the communication efficient distributed dual
+coordinate ascent (CoCoA) algorithm is the latest addition to the
+library. The CoCoA algorithm can be used to train distributed
+soft-margin SVMs.&lt;/p&gt;
+
+&lt;h3 id=&quot;flink-on-yarn-leveraging-apache-tez&quot;&gt;Flink on YARN 
leveraging Apache Tez&lt;/h3&gt;
+
+&lt;p&gt;We are introducing a new execution mode for Flink to be able to run
+restricted Flink programs on top of &lt;a 
href=&quot;http://tez.apache.org&quot;&gt;Apache
+Tez&lt;/a&gt;. This mode retains Flink’s APIs,
+optimizer, as well as Flink’s runtime operators, but instead of
+wrapping those in Flink tasks that are executed by Flink TaskManagers,
+it wraps them in Tez runtime tasks and builds a Tez DAG that
+represents the program.&lt;/p&gt;
+
+&lt;p&gt;By using Flink on Tez, users have an additional choice for an
+execution platform for Flink programs. While Flink’s distributed
+runtime favors low latency, streaming shuffles, and iterative
+algorithms, Tez focuses on scalability and elastic resource usage in
+shared YARN clusters.&lt;/p&gt;
+
+&lt;p&gt;Get started with Flink on Tez
+&lt;a 
href=&quot;http://ci.apache.org/projects/flink/flink-docs-master/flink_on_tez_guide.html&quot;&gt;here&lt;/a&gt;.&lt;/p&gt;
+
+&lt;h3 id=&quot;reworked-distributed-runtime-on-akka&quot;&gt;Reworked 
Distributed Runtime on Akka&lt;/h3&gt;
+
+&lt;p&gt;Flink’s RPC system has been replaced by the widely adopted
+&lt;a href=&quot;http://akka.io&quot;&gt;Akka&lt;/a&gt; framework. Akka’s 
concurrency model offers the
+right abstraction to develop a fast as well as robust distributed
+system. By using Akka’s own failure detection mechanism the stability
+of Flink’s runtime is significantly improved, because the system can
+now react in proper form to node outages. Furthermore, Akka improves
+Flink’s scalability by introducing asynchronous messages to the
+system. These asynchronous messages allow Flink to be run on many more
+nodes than before.&lt;/p&gt;
+
+&lt;h3 
id=&quot;exactly-once-processing-on-kafka-streaming-sources&quot;&gt;Exactly-once
  processing on Kafka Streaming Sources&lt;/h3&gt;
+
+&lt;p&gt;This release introduces stream processing with exacly-once delivery
+guarantees for Flink streaming programs that analyze streaming sources
+that are persisted by &lt;a href=&quot;http://kafka.apache.org&quot;&gt;Apache 
Kafka&lt;/a&gt;. The
+system is internally tracking the Kafka offsets to ensure that Flink
+can pick up data from Kafka where it left off in case of an failure.&lt;/p&gt;
+
+&lt;p&gt;Read
+&lt;a 
href=&quot;http://ci.apache.org/projects/flink/flink-docs-master/streaming_guide.html#apache-kafka&quot;&gt;here&lt;/a&gt;
+on how to use the persistent Kafka source.&lt;/p&gt;
+
+&lt;h3 id=&quot;improved-yarn-support&quot;&gt;Improved YARN support&lt;/h3&gt;
+
+&lt;p&gt;Flink’s YARN client contains several improvements, such as a 
detached
+mode for starting a YARN session in the background, the ability to
+submit a single Flink job to a YARN cluster without starting a
+session, including a “fire and forget” mode. Flink is now also able to
+reallocate failed YARN containers to maintain the size of the
+requested cluster. This feature allows to implement fault-tolerant
+setups on top of YARN. There is also an internal Java API to deploy
+and control a running YARN cluster. This is being used by system
+integrators to easily control Flink on YARN within their Hadoop 2
+cluster.&lt;/p&gt;
+
+&lt;p&gt;See the YARN docs
+&lt;a 
href=&quot;http://ci.apache.org/projects/flink/flink-docs-master/yarn_setup.html&quot;&gt;here&lt;/a&gt;.&lt;/p&gt;
+
+&lt;h2 id=&quot;more-improvements-and-fixes&quot;&gt;More Improvements and 
Fixes&lt;/h2&gt;
+
+&lt;ul&gt;
+&lt;li&gt;&lt;p&gt;&lt;a 
href=&quot;https://issues.apache.org/jira/browse/FLINK-1605&quot;&gt;FLINK-1605&lt;/a&gt;:
+Flink is not exposing its Guava and ASM dependencies to Maven
+projects depending on Flink. We use the maven-shade-plugin to
+relocate these dependencies into our own namespace. This allows
+users to use any Guava or ASM version.&lt;/p&gt;&lt;/li&gt;
+&lt;li&gt;&lt;p&gt;&lt;a 
href=&quot;https://issues.apache.org/jira/browse/FLINK-1605&quot;&gt;FLINK-1417&lt;/a&gt;:
+Automatic recognition and registration of Java Types at Kryo and the
+internal serializers: Flink has its own type handling and
+serialization framework falling back to Kryo for types that it cannot
+handle. To get the best performance Flink is automatically registering
+all types a user is using in their program with Kryo.Flink also
+registers serializers for Protocol Buffers, Thrift, Avro and YodaTime
+automatically.  Users can also manually register serializers to Kryo
+(&lt;a 
href=&quot;https://issues.apache.org/jira/browse/FLINK-1399&quot;&gt;https://issues.apache.org/jira/browse/FLINK-1399&lt;/a&gt;)&lt;/p&gt;&lt;/li&gt;
+&lt;li&gt;&lt;p&gt;&lt;a 
href=&quot;https://issues.apache.org/jira/browse/FLINK-1296&quot;&gt;FLINK-1296&lt;/a&gt;:
 Add
+support for sorting very large records&lt;/p&gt;&lt;/li&gt;
+&lt;li&gt;&lt;p&gt;&lt;a 
href=&quot;https://issues.apache.org/jira/browse/FLINK-1679&quot;&gt;FLINK-1679&lt;/a&gt;:
+&amp;quot;degreeOfParallelism&amp;quot; methods renamed to 
&amp;quot;parallelism&amp;quot;&lt;/p&gt;&lt;/li&gt;
+&lt;li&gt;&lt;p&gt;&lt;a 
href=&quot;https://issues.apache.org/jira/browse/FLINK-1501&quot;&gt;FLINK-1501&lt;/a&gt;:
 Add
+metrics library for monitoring TaskManagers&lt;/p&gt;&lt;/li&gt;
+&lt;li&gt;&lt;p&gt;&lt;a 
href=&quot;https://issues.apache.org/jira/browse/FLINK-1760&quot;&gt;FLINK-1760&lt;/a&gt;:
 Add
+support for building Flink with Scala 2.11&lt;/p&gt;&lt;/li&gt;
+&lt;li&gt;&lt;p&gt;&lt;a 
href=&quot;https://issues.apache.org/jira/browse/FLINK-1648&quot;&gt;FLINK-1648&lt;/a&gt;:
 Add
+a mode where the system automatically sets the parallelism to the
+available task slots&lt;/p&gt;&lt;/li&gt;
+&lt;li&gt;&lt;p&gt;&lt;a 
href=&quot;https://issues.apache.org/jira/browse/FLINK-1622&quot;&gt;FLINK-1622&lt;/a&gt;:
 Add
+groupCombine operator&lt;/p&gt;&lt;/li&gt;
+&lt;li&gt;&lt;p&gt;&lt;a 
href=&quot;https://issues.apache.org/jira/browse/FLINK-1589&quot;&gt;FLINK-1589&lt;/a&gt;:
 Add
+option to pass Configuration to LocalExecutor&lt;/p&gt;&lt;/li&gt;
+&lt;li&gt;&lt;p&gt;&lt;a 
href=&quot;https://issues.apache.org/jira/browse/FLINK-1504&quot;&gt;FLINK-1504&lt;/a&gt;:
 Add
+support for accessing secured HDFS clusters in standalone 
mode&lt;/p&gt;&lt;/li&gt;
+&lt;li&gt;&lt;p&gt;&lt;a 
href=&quot;https://issues.apache.org/jira/browse/FLINK-1478&quot;&gt;FLINK-1478&lt;/a&gt;:
 Add
+strictly local input split assignment&lt;/p&gt;&lt;/li&gt;
+&lt;li&gt;&lt;p&gt;&lt;a 
href=&quot;https://issues.apache.org/jira/browse/FLINK-1512&quot;&gt;FLINK-1512&lt;/a&gt;:
 Add
+CsvReader for reading into POJOs.&lt;/p&gt;&lt;/li&gt;
+&lt;li&gt;&lt;p&gt;&lt;a 
href=&quot;https://issues.apache.org/jira/browse/FLINK-1461&quot;&gt;FLINK-1461&lt;/a&gt;:
 Add
+sortPartition operator&lt;/p&gt;&lt;/li&gt;
+&lt;li&gt;&lt;p&gt;&lt;a 
href=&quot;https://issues.apache.org/jira/browse/FLINK-1450&quot;&gt;FLINK-1450&lt;/a&gt;:
 Add
+Fold operator to the Streaming api&lt;/p&gt;&lt;/li&gt;
+&lt;li&gt;&lt;p&gt;&lt;a 
href=&quot;https://issues.apache.org/jira/browse/FLINK-1389&quot;&gt;FLINK-1389&lt;/a&gt;:
+Allow setting custom file extensions for files created by the
+FileOutputFormat&lt;/p&gt;&lt;/li&gt;
+&lt;li&gt;&lt;p&gt;&lt;a 
href=&quot;https://issues.apache.org/jira/browse/FLINK-1236&quot;&gt;FLINK-1236&lt;/a&gt;:
 Add
+support for localization of Hadoop Input Splits&lt;/p&gt;&lt;/li&gt;
+&lt;li&gt;&lt;p&gt;&lt;a 
href=&quot;https://issues.apache.org/jira/browse/FLINK-1179&quot;&gt;FLINK-1179&lt;/a&gt;:
 Add
+button to JobManager web interface to request stack trace of a
+TaskManager&lt;/p&gt;&lt;/li&gt;
+&lt;li&gt;&lt;p&gt;&lt;a 
href=&quot;https://issues.apache.org/jira/browse/FLINK-1105&quot;&gt;FLINK-1105&lt;/a&gt;:
 Add
+support for locally sorted output&lt;/p&gt;&lt;/li&gt;
+&lt;li&gt;&lt;p&gt;&lt;a 
href=&quot;https://issues.apache.org/jira/browse/FLINK-1688&quot;&gt;FLINK-1688&lt;/a&gt;:
 Add
+socket sink&lt;/p&gt;&lt;/li&gt;
+&lt;li&gt;&lt;p&gt;&lt;a 
href=&quot;https://issues.apache.org/jira/browse/FLINK-1436&quot;&gt;FLINK-1436&lt;/a&gt;:
+Improve usability of command line interface&lt;/p&gt;&lt;/li&gt;
+&lt;/ul&gt;
+</description>
+<pubDate>Mon, 13 Apr 2015 12:00:00 +0200</pubDate>
+<link>http://flink.apache.org/news/2015/04/13/release-0.9.0-milestone1.html</link>
+<guid isPermaLink="true">/news/2015/04/13/release-0.9.0-milestone1.html</guid>
+</item>
+
+<item>
+<title>March 2015 in the Flink community</title>
+<description>&lt;p&gt;March has been a busy month in the Flink 
community.&lt;/p&gt;
+
+&lt;h3 id=&quot;flink-runner-for-google-cloud-dataflow&quot;&gt;Flink runner 
for Google Cloud Dataflow&lt;/h3&gt;
+
+&lt;p&gt;A Flink runner for Google Cloud Dataflow was announced. See the blog
+posts by &lt;a href=&quot;http://data-artisans.com/dataflow.html&quot;&gt;data 
Artisans&lt;/a&gt; and
+the &lt;a 
href=&quot;http://googlecloudplatform.blogspot.de/2015/03/announcing-Google-Cloud-Dataflow-runner-for-Apache-Flink.html&quot;&gt;Google
 Cloud Platform Blog&lt;/a&gt;.
+Google Cloud Dataflow programs can be written using and open-source
+SDK and run in multiple backends, either as a managed service inside
+Google&amp;#39;s infrastructure, or leveraging open source runners,
+including Apache Flink.&lt;/p&gt;
+
+&lt;h3 id=&quot;learn-about-the-internals-of-flink&quot;&gt;Learn about the 
internals of Flink&lt;/h3&gt;
+
+&lt;p&gt;The community has started an effort to better document the internals
+of Flink. Check out the first articles on the Flink wiki on &lt;a 
href=&quot;https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=53741525&quot;&gt;how
 Flink
+manages
+memory&lt;/a&gt;,
+&lt;a 
href=&quot;https://cwiki.apache.org/confluence/display/FLINK/Data+exchange+between+tasks&quot;&gt;how
 tasks in Flink exchange
+data&lt;/a&gt;,
+&lt;a 
href=&quot;https://cwiki.apache.org/confluence/display/FLINK/Type+System%2C+Type+Extraction%2C+Serialization&quot;&gt;type
 extraction and serialization in
+Flink&lt;/a&gt;,
+as well as &lt;a 
href=&quot;https://cwiki.apache.org/confluence/display/FLINK/Akka+and+Actors&quot;&gt;how
 Flink builds on Akka for distributed
+coordination&lt;/a&gt;.&lt;/p&gt;
+
+&lt;p&gt;Check out also the &lt;a 
href=&quot;http://flink.apache.org/news/2015/03/13/peeking-into-Apache-Flinks-Engine-Room.html&quot;&gt;new
 blog
+post&lt;/a&gt;
+on how Flink executes joins with several insights into Flink&amp;#39;s 
runtime.&lt;/p&gt;
+
+&lt;h3 id=&quot;meetups-and-talks&quot;&gt;Meetups and talks&lt;/h3&gt;
+
+&lt;p&gt;Flink&amp;#39;s machine learning efforts were presented at the &lt;a 
href=&quot;http://www.meetup.com/Machine-Learning-Stockholm/events/221144997/&quot;&gt;Machine
+Learning Stockholm meetup
+group&lt;/a&gt;. The
+regular Berlin Flink meetup featured a talk on the past, present, and
+future of Flink. The talk is available on
+&lt;a 
href=&quot;https://www.youtube.com/watch?v=fw2DBE6ZiEQ&amp;amp;feature=youtu.be&quot;&gt;youtube&lt;/a&gt;.&lt;/p&gt;
+
+&lt;h2 id=&quot;in-the-flink-master&quot;&gt;In the Flink master&lt;/h2&gt;
+
+&lt;h3 id=&quot;table-api-in-scala-and-java&quot;&gt;Table API in Scala and 
Java&lt;/h3&gt;
+
+&lt;p&gt;The new &lt;a 
href=&quot;https://github.com/apache/flink/tree/master/flink-staging/flink-table&quot;&gt;Table
+API&lt;/a&gt;
+in Flink is now available in both Java and Scala. Check out the
+examples &lt;a 
href=&quot;https://github.com/apache/flink/blob/master/flink-staging/flink-table/src/main/java/org/apache/flink/examples/java/JavaTableExample.java&quot;&gt;here
 (Java)&lt;/a&gt; and &lt;a 
href=&quot;https://github.com/apache/flink/tree/master/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala&quot;&gt;here
 (Scala)&lt;/a&gt;.&lt;/p&gt;
+
+&lt;h3 id=&quot;additions-to-the-machine-learning-library&quot;&gt;Additions 
to the Machine Learning library&lt;/h3&gt;
+
+&lt;p&gt;Flink&amp;#39;s &lt;a 
href=&quot;https://github.com/apache/flink/tree/master/flink-staging/flink-ml&quot;&gt;Machine
 Learning
+library&lt;/a&gt;
+is seeing quite a bit of traction. Recent additions include the &lt;a 
href=&quot;http://arxiv.org/abs/1409.1458&quot;&gt;CoCoA
+algorithm&lt;/a&gt; for distributed
+optimization.&lt;/p&gt;
+
+&lt;h3 
id=&quot;exactly-once-delivery-guarantees-for-streaming-jobs&quot;&gt;Exactly-once
 delivery guarantees for streaming jobs&lt;/h3&gt;
+
+&lt;p&gt;Flink streaming jobs now provide exactly once processing guarantees
+when coupled with persistent sources (notably &lt;a 
href=&quot;http://kafka.apache.org&quot;&gt;Apache
+Kafka&lt;/a&gt;). Flink periodically checkpoints and
+persists the offsets of the sources and restarts from those
+checkpoints at failure recovery. This functionality is currently
+limited in that it does not yet handle large state and iterative
+programs.&lt;/p&gt;
+
+&lt;h3 id=&quot;flink-on-tez&quot;&gt;Flink on Tez&lt;/h3&gt;
+
+&lt;p&gt;A new execution environment enables non-iterative Flink jobs to use
+Tez as an execution backend instead of Flink&amp;#39;s own network stack. 
Learn more
+&lt;a 
href=&quot;http://ci.apache.org/projects/flink/flink-docs-master/flink_on_tez_guide.html&quot;&gt;here&lt;/a&gt;.&lt;/p&gt;
+</description>
+<pubDate>Tue, 07 Apr 2015 12:00:00 +0200</pubDate>
+<link>http://flink.apache.org/news/2015/04/07/march-in-flink.html</link>
+<guid isPermaLink="true">/news/2015/04/07/march-in-flink.html</guid>
+</item>
+
+<item>
+<title>Peeking into Apache Flink&#39;s Engine Room</title>
+<description>&lt;h3 id=&quot;join-processing-in-apache-flink&quot;&gt;Join 
Processing in Apache Flink&lt;/h3&gt;
+
+&lt;p&gt;Joins are prevalent operations in many data processing applications. 
Most data processing systems feature APIs that make joining data sets very 
easy. However, the internal algorithms for join processing are much more 
involved especially if large data sets need to be efficiently handled. 
Therefore, join processing serves as a good example to discuss the salient 
design points and implementation details of a data processing system.&lt;/p&gt;
+
+&lt;p&gt;In this blog post, we cut through Apache Flink’s layered 
architecture and take a look at its internals with a focus on how it handles 
joins. Specifically, I will&lt;/p&gt;
+
+&lt;ul&gt;
+&lt;li&gt;show how easy it is to join data sets using Flink’s fluent APIs, 
&lt;/li&gt;
+&lt;li&gt;discuss basic distributed join strategies, Flink’s join 
implementations, and its memory management,&lt;/li&gt;
+&lt;li&gt;talk about Flink’s optimizer that automatically chooses join 
strategies,&lt;/li&gt;
+&lt;li&gt;show some performance numbers for joining data sets of different 
sizes, and finally&lt;/li&gt;
+&lt;li&gt;briefly discuss joining of co-located and pre-sorted data 
sets.&lt;/li&gt;
+&lt;/ul&gt;
+
+&lt;p&gt;&lt;em&gt;Disclaimer&lt;/em&gt;: This blog post is exclusively about 
equi-joins. Whenever I say “join” in the following, I actually mean 
“equi-join”.&lt;/p&gt;
+
+&lt;h3 id=&quot;how-do-i-join-with-flink?&quot;&gt;How do I join with 
Flink?&lt;/h3&gt;
+
+&lt;p&gt;Flink provides fluent APIs in Java and Scala to write data flow 
programs. Flink’s APIs are centered around parallel data collections which 
are called data sets. data sets are processed by applying Transformations that 
compute new data sets. Flink’s transformations include Map and Reduce as 
known from MapReduce &lt;a 
href=&quot;http://research.google.com/archive/mapreduce.html&quot;&gt;[1]&lt;/a&gt;
 but also operators for joining, co-grouping, and iterative processing. The 
documentation gives an overview of all available transformations &lt;a 
href=&quot;http://ci.apache.org/projects/flink/flink-docs-release-0.8/dataset_transformations.html&quot;&gt;[2]&lt;/a&gt;.
 &lt;/p&gt;
+
+&lt;p&gt;Joining two Scala case class data sets is very easy as the following 
example shows:&lt;/p&gt;
+&lt;div class=&quot;highlight&quot;&gt;&lt;pre&gt;&lt;code 
class=&quot;language-scala&quot; data-lang=&quot;scala&quot;&gt;&lt;span 
class=&quot;c1&quot;&gt;// define your data types&lt;/span&gt;
+&lt;span class=&quot;k&quot;&gt;case&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;class&lt;/span&gt; &lt;span 
class=&quot;nc&quot;&gt;PageVisit&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;url&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;:&lt;/span&gt; &lt;span 
class=&quot;kt&quot;&gt;String&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;ip&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;:&lt;/span&gt; &lt;span 
class=&quot;kt&quot;&gt;String&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;userId&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;:&lt;/span&gt; &lt;span 
class=&quot;kt&quot;&gt;Long&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)&lt;/span&gt;
+&lt;span class=&quot;k&quot;&gt;case&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;class&lt;/span&gt; &lt;span 
class=&quot;nc&quot;&gt;User&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;id&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;:&lt;/span&gt; &lt;span 
class=&quot;kt&quot;&gt;Long&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;name&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;:&lt;/span&gt; &lt;span 
class=&quot;kt&quot;&gt;String&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;email&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;:&lt;/span&gt; &lt;span 
class=&quot;kt&quot;&gt;String&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;country&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;:&lt;/span&gt; &lt;span 
class=&quot;kt&quot;&gt;String&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)&lt;/span&gt;
+
+&lt;span class=&quot;c1&quot;&gt;// get your data from somewhere&lt;/span&gt;
+&lt;span class=&quot;k&quot;&gt;val&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;visits&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;:&lt;/span&gt; &lt;span 
class=&quot;kt&quot;&gt;DataSet&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;[&lt;/span&gt;&lt;span 
class=&quot;kt&quot;&gt;PageVisit&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;]&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;...&lt;/span&gt;
+&lt;span class=&quot;k&quot;&gt;val&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;users&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;:&lt;/span&gt; &lt;span 
class=&quot;kt&quot;&gt;DataSet&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;[&lt;/span&gt;&lt;span 
class=&quot;kt&quot;&gt;User&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;]&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;...&lt;/span&gt;
+
+&lt;span class=&quot;c1&quot;&gt;// filter the users data set&lt;/span&gt;
+&lt;span class=&quot;k&quot;&gt;val&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;germanUsers&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;users&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;filter&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;((&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;u&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;=&amp;gt;&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;u&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;country&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;equals&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;s&quot;&gt;&amp;quot;de&amp;quot;&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;))&lt;/span&gt;
+&lt;span class=&quot;c1&quot;&gt;// join data sets&lt;/span&gt;
+&lt;span class=&quot;k&quot;&gt;val&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;germanVisits&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;:&lt;/span&gt; &lt;span 
class=&quot;kt&quot;&gt;DataSet&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;[(&lt;/span&gt;&lt;span 
class=&quot;kt&quot;&gt;PageVisit&lt;/span&gt;, &lt;span 
class=&quot;kt&quot;&gt;User&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)]&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;=&lt;/span&gt;
+      &lt;span class=&quot;c1&quot;&gt;// equi-join condition 
(PageVisit.userId = User.id)&lt;/span&gt;
+     &lt;span class=&quot;n&quot;&gt;visits&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;join&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;germanUsers&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;).&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;where&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;s&quot;&gt;&amp;quot;userId&amp;quot;&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;).&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;equalTo&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;s&quot;&gt;&amp;quot;id&amp;quot;&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)&lt;/span&gt;
+&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;
+&lt;p&gt;Flink’s APIs also allow to:&lt;/p&gt;
+
+&lt;ul&gt;
+&lt;li&gt;apply a user-defined join function to each pair of joined elements 
instead returning a &lt;code&gt;($Left, $Right)&lt;/code&gt; tuple,&lt;/li&gt;
+&lt;li&gt;select fields of pairs of joined Tuple elements (projection), 
and&lt;/li&gt;
+&lt;li&gt;define composite join keys such as 
&lt;code&gt;.where(“orderDate”, “zipCode”).equalTo(“date”, 
“zip”)&lt;/code&gt;.&lt;/li&gt;
+&lt;/ul&gt;
+
+&lt;p&gt;See the documentation for more details on Flink’s join features 
&lt;a 
href=&quot;http://ci.apache.org/projects/flink/flink-docs-release-0.8/dataset_transformations.html#join&quot;&gt;[3]&lt;/a&gt;.&lt;/p&gt;
+
+&lt;h3 id=&quot;how-does-flink-join-my-data?&quot;&gt;How does Flink join my 
data?&lt;/h3&gt;
+
+&lt;p&gt;Flink uses techniques which are well known from parallel database 
systems to efficiently execute parallel joins. A join operator must establish 
all pairs of elements from its input data sets for which the join condition 
evaluates to true. In a standalone system, the most straight-forward 
implementation of a join is the so-called nested-loop join which builds the 
full Cartesian product and evaluates the join condition for each pair of 
elements. This strategy has quadratic complexity and does obviously not scale 
to large inputs.&lt;/p&gt;
+
+&lt;p&gt;In a distributed system joins are commonly processed in two 
steps:&lt;/p&gt;
+
+&lt;ol&gt;
+&lt;li&gt;The data of both inputs is distributed across all parallel instances 
that participate in the join and&lt;/li&gt;
+&lt;li&gt;each parallel instance performs a standard stand-alone join 
algorithm on its local partition of the overall data. &lt;/li&gt;
+&lt;/ol&gt;
+
+&lt;p&gt;The distribution of data across parallel instances must ensure that 
each valid join pair can be locally built by exactly one instance. For both 
steps, there are multiple valid strategies that can be independently picked and 
which are favorable in different situations. In Flink terminology, the first 
phase is called Ship Strategy and the second phase Local Strategy. In the 
following I will describe Flink’s ship and local strategies to join two data 
sets &lt;em&gt;R&lt;/em&gt; and &lt;em&gt;S&lt;/em&gt;.&lt;/p&gt;
+
+&lt;h4 id=&quot;ship-strategies&quot;&gt;Ship Strategies&lt;/h4&gt;
+
+&lt;p&gt;Flink features two ship strategies to establish a valid data 
partitioning for a join:&lt;/p&gt;
+
+&lt;ul&gt;
+&lt;li&gt;the &lt;em&gt;Repartition-Repartition&lt;/em&gt; strategy (RR) 
and&lt;/li&gt;
+&lt;li&gt;the &lt;em&gt;Broadcast-Forward&lt;/em&gt; strategy (BF).&lt;/li&gt;
+&lt;/ul&gt;
+
+&lt;p&gt;The Repartition-Repartition strategy partitions both inputs, R and S, 
on their join key attributes using the same partitioning function. Each 
partition is assigned to exactly one parallel join instance and all data of 
that partition is sent to its associated instance. This ensures that all 
elements that share the same join key are shipped to the same parallel instance 
and can be locally joined. The cost of the RR strategy is a full shuffle of 
both data sets over the network.&lt;/p&gt;
+
+&lt;p&gt;&lt;center&gt;
+&lt;img src=&quot;/img/blog/joins-repartition.png&quot; 
style=&quot;width:90%;margin:15px&quot;&gt;
+&lt;/center&gt;&lt;/p&gt;
+
+&lt;p&gt;The Broadcast-Forward strategy sends one complete data set (R) to 
each parallel instance that holds a partition of the other data set (S), i.e., 
each parallel instance receives the full data set R. Data set S remains local 
and is not shipped at all. The cost of the BF strategy depends on the size of R 
and the number of parallel instances it is shipped to. The size of S does not 
matter because S is not moved. The figure below illustrates how both ship 
strategies work. &lt;/p&gt;
+
+&lt;p&gt;&lt;center&gt;
+&lt;img src=&quot;/img/blog/joins-broadcast.png&quot; 
style=&quot;width:90%;margin:15px&quot;&gt;
+&lt;/center&gt;&lt;/p&gt;
+
+&lt;p&gt;The Repartition-Repartition and Broadcast-Forward ship strategies 
establish suitable data distributions to execute a distributed join. Depending 
on the operations that are applied before the join, one or even both inputs of 
a join are already distributed in a suitable way across parallel instances. In 
this case, Flink will reuse such distributions and only ship one or no input at 
all.&lt;/p&gt;
+
+&lt;h4 id=&quot;flink’s-memory-management&quot;&gt;Flink’s Memory 
Management&lt;/h4&gt;
+
+&lt;p&gt;Before delving into the details of Flink’s local join algorithms, I 
will briefly discuss Flink’s internal memory management. Data processing 
algorithms such as joining, grouping, and sorting need to hold portions of 
their input data in memory. While such algorithms perform best if there is 
enough memory available to hold all data, it is crucial to gracefully handle 
situations where the data size exceeds memory. Such situations are especially 
tricky in JVM-based systems such as Flink because the system needs to reliably 
recognize that it is short on memory. Failure to detect such situations can 
result in an &lt;code&gt;OutOfMemoryException&lt;/code&gt; and kill the JVM. 
&lt;/p&gt;
+
+&lt;p&gt;Flink handles this challenge by actively managing its memory. When a 
worker node (TaskManager) is started, it allocates a fixed portion (70% by 
default) of the JVM’s heap memory that is available after initialization as 
32KB byte arrays. These byte arrays are distributed as working memory to all 
algorithms that need to hold significant portions of data in memory. The 
algorithms receive their input data as Java data objects and serialize them 
into their working memory.&lt;/p&gt;
+
+&lt;p&gt;This design has several nice properties. First, the number of data 
objects on the JVM heap is much lower resulting in less garbage collection 
pressure. Second, objects on the heap have a certain space overhead and the 
binary representation is more compact. Especially data sets of many small 
elements benefit from that. Third, an algorithm knows exactly when the input 
data exceeds its working memory and can react by writing some of its filled 
byte arrays to the worker’s local filesystem. After the content of a byte 
array is written to disk, it can be reused to process more data. Reading data 
back into memory is as simple as reading the binary data from the local 
filesystem. The following figure illustrates Flink’s memory 
management.&lt;/p&gt;
+
+&lt;p&gt;&lt;center&gt;
+&lt;img src=&quot;/img/blog/joins-memmgmt.png&quot; 
style=&quot;width:90%;margin:15px&quot;&gt;
+&lt;/center&gt;&lt;/p&gt;
+
+&lt;p&gt;This active memory management makes Flink extremely robust for 
processing very large data sets on limited memory resources while preserving 
all benefits of in-memory processing if data is small enough to fit in-memory. 
De/serializing data into and from memory has a certain cost overhead compared 
to simply holding all data elements on the JVM’s heap. However, Flink 
features efficient custom de/serializers which also allow to perform certain 
operations such as comparisons directly on serialized data without 
deserializing data objects from memory.&lt;/p&gt;
+
+&lt;h4 id=&quot;local-strategies&quot;&gt;Local Strategies&lt;/h4&gt;
+
+&lt;p&gt;After the data has been distributed across all parallel join 
instances using either a Repartition-Repartition or Broadcast-Forward ship 
strategy, each instance runs a local join algorithm to join the elements of its 
local partition. Flink’s runtime features two common join strategies to 
perform these local joins:&lt;/p&gt;
+
+&lt;ul&gt;
+&lt;li&gt;the &lt;em&gt;Sort-Merge-Join&lt;/em&gt; strategy (SM) and 
&lt;/li&gt;
+&lt;li&gt;the &lt;em&gt;Hybrid-Hash-Join&lt;/em&gt; strategy (HH).&lt;/li&gt;
+&lt;/ul&gt;
+
+&lt;p&gt;The Sort-Merge-Join works by first sorting both input data sets on 
their join key attributes (Sort Phase) and merging the sorted data sets as a 
second step (Merge Phase). The sort is done in-memory if the local partition of 
a data set is small enough. Otherwise, an external merge-sort is done by 
collecting data until the working memory is filled, sorting it, writing the 
sorted data to the local filesystem, and starting over by filling the working 
memory again with more incoming data. After all input data has been received, 
sorted, and written as sorted runs to the local file system, a fully sorted 
stream can be obtained. This is done by reading the partially sorted runs from 
the local filesystem and sort-merging the records on the fly. Once the sorted 
streams of both inputs are available, both streams are sequentially read and 
merge-joined in a zig-zag fashion by comparing the sorted join key attributes, 
building join element pairs for matching keys, and advancing the sorte
 d stream with the lower join key. The figure below shows how the 
Sort-Merge-Join strategy works.&lt;/p&gt;
+
+&lt;p&gt;&lt;center&gt;
+&lt;img src=&quot;/img/blog/joins-smj.png&quot; 
style=&quot;width:90%;margin:15px&quot;&gt;
+&lt;/center&gt;&lt;/p&gt;
+
+&lt;p&gt;The Hybrid-Hash-Join distinguishes its inputs as build-side and 
probe-side input and works in two phases, a build phase followed by a probe 
phase. In the build phase, the algorithm reads the build-side input and inserts 
all data elements into an in-memory hash table indexed by their join key 
attributes. If the hash table outgrows the algorithm&amp;#39;s working memory, 
parts of the hash table (ranges of hash indexes) are written to the local 
filesystem. The build phase ends after the build-side input has been fully 
consumed. In the probe phase, the algorithm reads the probe-side input and 
probes the hash table for each element using its join key attribute. If the 
element falls into a hash index range that was spilled to disk, the element is 
also written to disk. Otherwise, the element is immediately joined with all 
matching elements from the hash table. If the hash table completely fits into 
the working memory, the join is finished after the probe-side input has been 
fully 
 consumed. Otherwise, the current hash table is dropped and a new hash table is 
built using spilled parts of the build-side input. This hash table is probed by 
the corresponding parts of the spilled probe-side input. Eventually, all data 
is joined. Hybrid-Hash-Joins perform best if the hash table completely fits 
into the working memory because an arbitrarily large the probe-side input can 
be processed on-the-fly without materializing it. However even if build-side 
input does not fit into memory, the the Hybrid-Hash-Join has very nice 
properties. In this case, in-memory processing is partially preserved and only 
a fraction of the build-side and probe-side data needs to be written to and 
read from the local filesystem. The next figure illustrates how the 
Hybrid-Hash-Join works.&lt;/p&gt;
+
+&lt;p&gt;&lt;center&gt;
+&lt;img src=&quot;/img/blog/joins-hhj.png&quot; 
style=&quot;width:90%;margin:15px&quot;&gt;
+&lt;/center&gt;&lt;/p&gt;
+
+&lt;h3 id=&quot;how-does-flink-choose-join-strategies?&quot;&gt;How does Flink 
choose join strategies?&lt;/h3&gt;
+
+&lt;p&gt;Ship and local strategies do not depend on each other and can be 
independently chosen. Therefore, Flink can execute a join of two data sets R 
and S in nine different ways by combining any of the three ship strategies (RR, 
BF with R being broadcasted, BF with S being broadcasted) with any of the three 
local strategies (SM, HH with R being build-side, HH with S being build-side). 
Each of these strategy combinations results in different execution performance 
depending on the data sizes and the available amount of working memory. In case 
of a small data set R and a much larger data set S, broadcasting R and using it 
as build-side input of a Hybrid-Hash-Join is usually a good choice because the 
much larger data set S is not shipped and not materialized (given that the hash 
table completely fits into memory). If both data sets are rather large or the 
join is performed on many parallel instances, repartitioning both inputs is a 
robust choice.&lt;/p&gt;
+
+&lt;p&gt;Flink features a cost-based optimizer which automatically chooses the 
execution strategies for all operators including joins. Without going into the 
details of cost-based optimization, this is done by computing cost estimates 
for execution plans with different strategies and picking the plan with the 
least estimated costs. Thereby, the optimizer estimates the amount of data 
which is shipped over the the network and written to disk. If no reliable size 
estimates for the input data can be obtained, the optimizer falls back to 
robust default choices. A key feature of the optimizer is to reason about 
existing data properties. For example, if the data of one input is already 
partitioned in a suitable way, the generated candidate plans will not 
repartition this input. Hence, the choice of a RR ship strategy becomes more 
likely. The same applies for previously sorted data and the Sort-Merge-Join 
strategy. Flink programs can help the optimizer to reason about existing data 
properti
 es by providing semantic information about  user-defined functions &lt;a 
href=&quot;http://ci.apache.org/projects/flink/flink-docs-master/programming_guide.html#semantic-annotations&quot;&gt;[4]&lt;/a&gt;.
 While the optimizer is a killer feature of Flink, it can happen that a user 
knows better than the optimizer how to execute a specific join. Similar to 
relational database systems, Flink offers optimizer hints to tell the optimizer 
which join strategies to pick &lt;a 
href=&quot;http://ci.apache.org/projects/flink/flink-docs-master/dataset_transformations.html#join-algorithm-hints&quot;&gt;[5]&lt;/a&gt;.&lt;/p&gt;
+
+&lt;h3 id=&quot;how-is-flink’s-join-performance?&quot;&gt;How is Flink’s 
join performance?&lt;/h3&gt;
+
+&lt;p&gt;Alright, that sounds good, but how fast are joins in Flink? Let’s 
have a look. We start with a benchmark of the single-core performance of 
Flink’s Hybrid-Hash-Join implementation and run a Flink program that executes 
a Hybrid-Hash-Join with parallelism 1. We run the program on a n1-standard-2 
Google Compute Engine instance (2 vCPUs, 7.5GB memory) with two locally 
attached SSDs. We give 4GB as working memory to the join. The join program 
generates 1KB records for both inputs on-the-fly, i.e., the data is not read 
from disk. We run 1:N (Primary-Key/Foreign-Key) joins and generate the smaller 
input with unique Integer join keys and the larger input with randomly chosen 
Integer join keys that fall into the key range of the smaller input. Hence, 
each tuple of the larger side joins with exactly one tuple of the smaller side. 
The result of the join is immediately discarded. We vary the size of the 
build-side input from 1 million to 12 million elements (1GB to 12GB). The probe-
 side input is kept constant at 64 million elements (64GB). The following chart 
shows the average execution time of three runs for each setup.&lt;/p&gt;
+
+&lt;p&gt;&lt;center&gt;
+&lt;img src=&quot;/img/blog/joins-single-perf.png&quot; 
style=&quot;width:85%;margin:15px&quot;&gt;
+&lt;/center&gt;&lt;/p&gt;
+
+&lt;p&gt;The joins with 1 to 3 GB build side (blue bars) are pure in-memory 
joins. The other joins partially spill data to disk (4 to 12GB, orange bars). 
The results show that the performance of Flink’s Hybrid-Hash-Join remains 
stable as long as the hash table completely fits into memory. As soon as the 
hash table becomes larger than the working memory, parts of the hash table and 
corresponding parts of the probe side are spilled to disk. The chart shows that 
the performance of the Hybrid-Hash-Join gracefully decreases in this situation, 
i.e., there is no sharp increase in runtime when the join starts spilling. In 
combination with Flink’s robust memory management, this execution behavior 
gives smooth performance without the need for fine-grained, data-dependent 
memory tuning.&lt;/p&gt;
+
+&lt;p&gt;So, Flink’s Hybrid-Hash-Join implementation performs well on a 
single thread even for limited memory resources, but how good is Flink’s 
performance when joining larger data sets in a distributed setting? For the 
next experiment we compare the performance of the most common join strategy 
combinations, namely:&lt;/p&gt;
+
+&lt;ul&gt;
+&lt;li&gt;Broadcast-Forward, Hybrid-Hash-Join (broadcasting and building with 
the smaller side),&lt;/li&gt;
+&lt;li&gt;Repartition, Hybrid-Hash-Join (building with the smaller side), 
and&lt;/li&gt;
+&lt;li&gt;Repartition, Sort-Merge-Join&lt;/li&gt;
+&lt;/ul&gt;
+
+&lt;p&gt;for different input size ratios:&lt;/p&gt;
+
+&lt;ul&gt;
+&lt;li&gt;1GB     : 1000GB&lt;/li&gt;
+&lt;li&gt;10GB    : 1000GB&lt;/li&gt;
+&lt;li&gt;100GB   : 1000GB &lt;/li&gt;
+&lt;li&gt;1000GB  : 1000GB&lt;/li&gt;
+&lt;/ul&gt;
+
+&lt;p&gt;The Broadcast-Forward strategy is only executed for up to 10GB. 
Building a hash table from 100GB broadcasted data in 5GB working memory would 
result in spilling proximately 95GB (build input) + 950GB (probe input) in each 
parallel thread and require more than 8TB local disk storage on each 
machine.&lt;/p&gt;
+
+&lt;p&gt;As in the single-core benchmark, we run 1:N joins, generate the data 
on-the-fly, and immediately discard the result after the join. We run the 
benchmark on 10 n1-highmem-8 Google Compute Engine instances. Each instance is 
equipped with 8 cores, 52GB RAM, 40GB of which are configured as working memory 
(5GB per core), and one local SSD for spilling to disk. All benchmarks are 
performed using the same configuration, i.e., no fine tuning for the respective 
data sizes is done. The programs are executed with a parallelism of 80. 
&lt;/p&gt;
+
+&lt;p&gt;&lt;center&gt;
+&lt;img src=&quot;/img/blog/joins-dist-perf.png&quot; 
style=&quot;width:70%;margin:15px&quot;&gt;
+&lt;/center&gt;&lt;/p&gt;
+
+&lt;p&gt;As expected, the Broadcast-Forward strategy performs best for very 
small inputs because the large probe side is not shipped over the network and 
is locally joined. However, when the size of the broadcasted side grows, two 
problems arise. First the amount of data which is shipped increases but also 
each parallel instance has to process the full broadcasted data set. The 
performance of both Repartitioning strategies behaves similar for growing input 
sizes which indicates that these strategies are mainly limited by the cost of 
the data transfer (at max 2TB are shipped over the network and joined). 
Although the Sort-Merge-Join strategy shows the worst performance all shown 
cases, it has a right to exist because it can nicely exploit sorted input 
data.&lt;/p&gt;
+
+&lt;h3 
id=&quot;i’ve-got-sooo-much-data-to-join,-do-i-really-need-to-ship-it?&quot;&gt;I’ve
 got sooo much data to join, do I really need to ship it?&lt;/h3&gt;
+
+&lt;p&gt;We have seen that off-the-shelf distributed joins work really well in 
Flink. But what if your data is so huge that you do not want to shuffle it 
across your cluster? We recently added some features to Flink for specifying 
semantic properties (partitioning and sorting) on input splits and co-located 
reading of local input files. With these tools at hand, it is possible to join 
pre-partitioned data sets from your local filesystem without sending a single 
byte over your cluster’s network. If the input data is even pre-sorted, the 
join can be done as a Sort-Merge-Join without sorting, i.e., the join is 
essentially done on-the-fly. Exploiting co-location requires a very special 
setup though. Data needs to be stored on the local filesystem because HDFS does 
not feature data co-location and might move file blocks across data nodes. That 
means you need to take care of many things yourself which HDFS would have done 
for you, including replication to avoid data loss. On the other h
 and, performance gains of joining co-located and pre-sorted can be quite 
substantial.&lt;/p&gt;
+
+&lt;h3 
id=&quot;tl;dr:-what-should-i-remember-from-all-of-this?&quot;&gt;tl;dr: What 
should I remember from all of this?&lt;/h3&gt;
+
+&lt;ul&gt;
+&lt;li&gt;Flink’s fluent Scala and Java APIs make joins and other data 
transformations easy as cake.&lt;/li&gt;
+&lt;li&gt;The optimizer does the hard choices for you, but gives you control 
in case you know better.&lt;/li&gt;
+&lt;li&gt;Flink’s join implementations perform very good in-memory and 
gracefully degrade when going to disk. &lt;/li&gt;
+&lt;li&gt;Due to Flink’s robust memory management, there is no need for job- 
or data-specific memory tuning to avoid a nasty 
&lt;code&gt;OutOfMemoryException&lt;/code&gt;. It just runs 
out-of-the-box.&lt;/li&gt;
+&lt;/ul&gt;
+
+&lt;h4 id=&quot;references&quot;&gt;References&lt;/h4&gt;
+
+&lt;p&gt;[1] &lt;a href=&quot;&quot;&gt;“MapReduce: Simplified data 
processing on large clusters”&lt;/a&gt;, Dean, Ghemawat, 2004 &lt;br&gt;
+[2] &lt;a 
href=&quot;http://ci.apache.org/projects/flink/flink-docs-release-0.8/dataset_transformations.html&quot;&gt;Flink
 0.8.1 documentation: Data Transformations&lt;/a&gt; &lt;br&gt;
+[3] &lt;a 
href=&quot;http://ci.apache.org/projects/flink/flink-docs-release-0.8/dataset_transformations.html#join&quot;&gt;Flink
 0.8.1 documentation: Joins&lt;/a&gt; &lt;br&gt;
+[4] &lt;a 
href=&quot;http://ci.apache.org/projects/flink/flink-docs-master/programming_guide.html#semantic-annotations&quot;&gt;Flink
 0.9-SNAPSHOT documentation: Semantic annotations&lt;/a&gt; &lt;br&gt;
+[5] &lt;a 
href=&quot;http://ci.apache.org/projects/flink/flink-docs-master/dataset_transformations.html#join-algorithm-hints&quot;&gt;Flink
 0.9-SNAPSHOT documentation: Optimizer join hints&lt;/a&gt; &lt;br&gt;&lt;/p&gt;
+
+&lt;p&gt;&lt;br&gt;
+&lt;small&gt;Written by Fabian Hueske (&lt;a 
href=&quot;https://twitter.com/fhueske&quot;&gt;@fhueske&lt;/a&gt;).&lt;/small&gt;&lt;/p&gt;
+</description>
+<pubDate>Fri, 13 Mar 2015 11:00:00 +0100</pubDate>
+<link>http://flink.apache.org/news/2015/03/13/peeking-into-Apache-Flinks-Engine-Room.html</link>
+<guid 
isPermaLink="true">/news/2015/03/13/peeking-into-Apache-Flinks-Engine-Room.html</guid>
+</item>
+
+<item>
+<title>February 2015 in the Flink community</title>
+<description>&lt;p&gt;February might be the shortest month of the year, but 
this does not
+mean that the Flink community has not been busy adding features to the
+system and fixing bugs. Here’s a rundown of the activity in the Flink
+community last month.&lt;/p&gt;
+
+&lt;h3 id=&quot;0.8.1-release&quot;&gt;0.8.1 release&lt;/h3&gt;
+
+&lt;p&gt;Flink 0.8.1 was released. This bugfixing release resolves a total of 
22 issues.&lt;/p&gt;
+
+&lt;h3 id=&quot;new-committer&quot;&gt;New committer&lt;/h3&gt;
+
+&lt;p&gt;&lt;a href=&quot;https://github.com/mxm&quot;&gt;Max 
Michels&lt;/a&gt; has been voted a committer by the Flink PMC.&lt;/p&gt;
+
+&lt;h3 id=&quot;flink-adapter-for-apache-samoa&quot;&gt;Flink adapter for 
Apache SAMOA&lt;/h3&gt;
+
+&lt;p&gt;&lt;a href=&quot;http://samoa.incubator.apache.org&quot;&gt;Apache 
SAMOA (incubating)&lt;/a&gt; is a
+distributed streaming machine learning (ML) framework with a
+programming abstraction for distributed streaming ML algorithms. SAMOA
+runs on a variety of backend engines, currently Apache Storm and
+Apache S4.  A &lt;a 
href=&quot;https://github.com/apache/incubator-samoa/pull/11&quot;&gt;pull
+request&lt;/a&gt; is
+available at the SAMOA repository that adds a Flink adapter for 
SAMOA.&lt;/p&gt;
+
+&lt;h3 id=&quot;easy-flink-deployment-on-google-compute-cloud&quot;&gt;Easy 
Flink deployment on Google Compute Cloud&lt;/h3&gt;
+
+&lt;p&gt;Flink is now integrated in bdutil, Google’s open source tool for
+creating and configuring (Hadoop) clusters in Google Compute
+Engine. Deployment of Flink clusters in now supported starting with
+&lt;a 
href=&quot;https://groups.google.com/forum/#!topic/gcp-hadoop-announce/uVJ_6y9cGKM&quot;&gt;bdutil
+1.2.0&lt;/a&gt;.&lt;/p&gt;
+
+&lt;h3 id=&quot;flink-on-the-web&quot;&gt;Flink on the Web&lt;/h3&gt;
+
+&lt;p&gt;A new blog post on &lt;a 
href=&quot;http://flink.apache.org/news/2015/02/09/streaming-example.html&quot;&gt;Flink
+Streaming&lt;/a&gt;
+was published at the blog. Flink was mentioned in several articles on
+the web. Here are some examples:&lt;/p&gt;
+
+&lt;ul&gt;
+&lt;li&gt;&lt;p&gt;&lt;a 
href=&quot;http://dataconomy.com/how-flink-became-an-apache-top-level-project/&quot;&gt;How
 Flink became an Apache Top-Level Project&lt;/a&gt;&lt;/p&gt;&lt;/li&gt;
+&lt;li&gt;&lt;p&gt;&lt;a 
href=&quot;https://www.linkedin.com/pulse/stale-synchronous-parallelism-new-frontier-apache-flink-nam-luc-tran?utm_content=buffer461af&amp;amp;utm_medium=social&amp;amp;utm_source=linkedin.com&amp;amp;utm_campaign=buffer&quot;&gt;Stale
 Synchronous Parallelism: The new frontier for Apache 
Flink?&lt;/a&gt;&lt;/p&gt;&lt;/li&gt;
+&lt;li&gt;&lt;p&gt;&lt;a 
href=&quot;http://www.hadoopsphere.com/2015/02/distributed-data-processing-with-apache.html&quot;&gt;Distributed
 data processing with Apache Flink&lt;/a&gt;&lt;/p&gt;&lt;/li&gt;
+&lt;li&gt;&lt;p&gt;&lt;a 
href=&quot;http://www.hadoopsphere.com/2015/02/ciao-latency-hallo-speed.html&quot;&gt;Ciao
 latency, hello speed&lt;/a&gt;&lt;/p&gt;&lt;/li&gt;
+&lt;/ul&gt;
+
+&lt;h2 id=&quot;in-the-flink-master&quot;&gt;In the Flink master&lt;/h2&gt;
+
+&lt;p&gt;The following features have been now merged in Flink’s master 
repository.&lt;/p&gt;
+
+&lt;h3 id=&quot;gelly&quot;&gt;Gelly&lt;/h3&gt;
+
+&lt;p&gt;Gelly, Flink’s Graph API allows users to manipulate graph-shaped 
data
+directly. Here’s for example a calculation of shortest paths in a
+graph:&lt;/p&gt;
+
+&lt;div class=&quot;highlight&quot;&gt;&lt;pre&gt;&lt;code 
class=&quot;language-java&quot; data-lang=&quot;java&quot;&gt;&lt;span 
class=&quot;n&quot;&gt;Graph&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;Long&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;Double&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;Double&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;graph&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;Graph&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;fromDataSet&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;vertices&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;edges&lt;/span&gt;&lt;span class=&quot;o&quot
 ;&gt;,&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;env&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;);&lt;/span&gt;
+
+&lt;span class=&quot;n&quot;&gt;DataSet&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;Vertex&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;Long&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;Double&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;&amp;gt;&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;singleSourceShortestPaths&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;graph&lt;/span&gt;
+     &lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;run&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;SingleSourceShortestPaths&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;Long&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;srcVertexId&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt;
+           &lt;span class=&quot;n&quot;&gt;maxIterations&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)).&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;getVertices&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;();&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;
+       
+
+&lt;p&gt;See more Gelly examples
+&lt;a 
href=&quot;https://github.com/apache/flink/tree/master/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example&quot;&gt;here&lt;/a&gt;.&lt;/p&gt;
+
+&lt;h3 id=&quot;flink-expressions&quot;&gt;Flink Expressions&lt;/h3&gt;
+
+&lt;p&gt;The newly merged
+&lt;a 
href=&quot;https://github.com/apache/flink/tree/master/flink-staging/flink-expressions&quot;&gt;flink-expressions&lt;/a&gt;
+module is the first step in Flink’s roadmap towards logical queries
+and SQL support. Here’s a preview on how you can read two CSV file,
+assign a logical schema to, and apply transformations like filters and
+joins using logical attributes rather than physical data types.&lt;/p&gt;
+
+&lt;div class=&quot;highlight&quot;&gt;&lt;pre&gt;&lt;code 
class=&quot;language-scala&quot; data-lang=&quot;scala&quot;&gt;&lt;span 
class=&quot;k&quot;&gt;val&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;customers&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;getCustomerDataSet&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;env&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)&lt;/span&gt;
+ &lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;as&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;-Symbol&quot;&gt;&amp;#39;id&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;-Symbol&quot;&gt;&amp;#39;mktSegment&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)&lt;/span&gt;
+ &lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;filter&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt; &lt;span 
class=&quot;-Symbol&quot;&gt;&amp;#39;mktSegment&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;===&lt;/span&gt; &lt;span 
class=&quot;s&quot;&gt;&amp;quot;AUTOMOBILE&amp;quot;&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;)&lt;/span&gt;
+
+&lt;span class=&quot;k&quot;&gt;val&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;orders&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;getOrdersDataSet&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;env&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)&lt;/span&gt;
+ &lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;filter&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;o&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;=&amp;gt;&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;dateFormat&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;parse&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;o&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;orderDate&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;).&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;before&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;date&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;)&lt;/span&gt;
+ &lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;as&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;-Symbol&quot;&gt;&amp;#39;orderId&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;-Symbol&quot;&gt;&amp;#39;custId&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;-Symbol&quot;&gt;&amp;#39;orderDate&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;-Symbol&quot;&gt;&amp;#39;shipPrio&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)&lt;/span&gt;
+
+&lt;span class=&quot;k&quot;&gt;val&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;items&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;=&lt;/span&gt;
+ &lt;span class=&quot;n&quot;&gt;orders&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;join&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;customers&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)&lt;/span&gt;
+   &lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;where&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;-Symbol&quot;&gt;&amp;#39;custId&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;===&lt;/span&gt; &lt;span 
class=&quot;-Symbol&quot;&gt;&amp;#39;id&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)&lt;/span&gt;
+   &lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;select&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;-Symbol&quot;&gt;&amp;#39;orderId&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;-Symbol&quot;&gt;&amp;#39;orderDate&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;-Symbol&quot;&gt;&amp;#39;shipPrio&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;
+   
+
+&lt;h3 id=&quot;access-to-hcatalog-tables&quot;&gt;Access to HCatalog 
tables&lt;/h3&gt;
+
+&lt;p&gt;With the &lt;a 
href=&quot;https://github.com/apache/flink/tree/master/flink-staging/flink-hcatalog&quot;&gt;flink-hcatalog
+module&lt;/a&gt;,
+you can now conveniently access HCatalog/Hive tables. The module
+supports projection (selection and order of fields) and partition
+filters.&lt;/p&gt;
+
+&lt;h3 id=&quot;access-to-secured-yarn-clusters/hdfs.&quot;&gt;Access to 
secured YARN clusters/HDFS.&lt;/h3&gt;
+
+&lt;p&gt;With this change users can access Kerberos secured YARN (and HDFS)
+Hadoop clusters.  Also, basic support for accessing secured HDFS with
+a standalone Flink setup is now available.&lt;/p&gt;
+</description>
+<pubDate>Mon, 02 Mar 2015 11:00:00 +0100</pubDate>
+<link>http://flink.apache.org/news/2015/03/02/february-2015-in-flink.html</link>
+<guid isPermaLink="true">/news/2015/03/02/february-2015-in-flink.html</guid>
+</item>
+
+<item>
+<title>Introducing Flink Streaming</title>
+<description>&lt;p&gt;This post is the first of a series of blog posts on 
Flink Streaming,
+the recent addition to Apache Flink that makes it possible to analyze
+continuous data sources in addition to static files. Flink Streaming
+uses the pipelined Flink engine to process data streams in real time
+and offers a new API including definition of flexible windows.&lt;/p&gt;
+
+&lt;p&gt;In this post, we go through an example that uses the Flink Streaming
+API to compute statistics on stock market data that arrive
+continuously and combine the stock market data with Twitter streams.
+See the &lt;a 
href=&quot;http://flink.apache.org/docs/latest/streaming_guide.html&quot;&gt;Streaming
 Programming
+Guide&lt;/a&gt; for a
+detailed presentation of the Streaming API.&lt;/p&gt;
+
+&lt;p&gt;First, we read a bunch of stock price streams and combine them into
+one stream of market data. We apply several transformations on this
+market data stream, like rolling aggregations per stock. Then we emit
+price warning alerts when the prices are rapidly changing. Moving 
+towards more advanced features, we compute rolling correlations
+between the market data streams and a Twitter stream with stock 
mentions.&lt;/p&gt;
+
+&lt;p&gt;For running the example implementation please use the 
&lt;em&gt;0.9-SNAPSHOT&lt;/em&gt; 
+version of Flink as a dependency. The full example code base can be 
+found &lt;a 
href=&quot;https://github.com/apache/flink/blob/master/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala&quot;&gt;here&lt;/a&gt;
 in Scala and &lt;a 
href=&quot;https://github.com/apache/flink/blob/master/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/StockPrices.java&quot;&gt;here&lt;/a&gt;
 in Java7.&lt;/p&gt;
+
+&lt;p&gt;&lt;a href=&quot;#top&quot;&gt;&lt;/a&gt;&lt;/p&gt;
+
+&lt;p&gt;&lt;a href=&quot;#top&quot;&gt;Back to top&lt;/a&gt;&lt;/p&gt;
+
+&lt;h2 id=&quot;reading-from-multiple-inputs&quot;&gt;Reading from multiple 
inputs&lt;/h2&gt;
+
+&lt;p&gt;First, let us create the stream of stock prices:&lt;/p&gt;
+
+&lt;ol&gt;
+&lt;li&gt;Read a socket stream of stock prices&lt;/li&gt;
+&lt;li&gt;Parse the text in the stream to create a stream of 
&lt;code&gt;StockPrice&lt;/code&gt; objects&lt;/li&gt;
+&lt;li&gt;Add four other sources tagged with the stock symbol.&lt;/li&gt;
+&lt;li&gt;Finally, merge the streams to create a unified stream. &lt;/li&gt;
+&lt;/ol&gt;
+
+&lt;p&gt;&lt;img alt=&quot;Reading from multiple inputs&quot; 
src=&quot;/img/blog/blog_multi_input.png&quot; width=&quot;70%&quot; 
class=&quot;img-responsive center-block&quot;&gt;&lt;/p&gt;
+
+&lt;div class=&quot;codetabs&quot; markdown=&quot;1&quot;&gt;
+&lt;div data-lang=&quot;scala&quot; markdown=&quot;1&quot;&gt;
+
+&lt;div class=&quot;highlight&quot;&gt;&lt;pre&gt;&lt;code 
class=&quot;language-scala&quot; data-lang=&quot;scala&quot;&gt;&lt;span 
class=&quot;k&quot;&gt;def&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;main&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;args&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;:&lt;/span&gt; &lt;span 
class=&quot;kt&quot;&gt;Array&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;[&lt;/span&gt;&lt;span 
class=&quot;kt&quot;&gt;String&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;])&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;{&lt;/span&gt;
+
+  &lt;span class=&quot;k&quot;&gt;val&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;env&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;nc&quot;&gt;StreamExecutionEnvironment&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;getExecutionEnvironment&lt;/span&gt;
+
+  &lt;span class=&quot;c1&quot;&gt;//Read from a socket stream at map it to 
StockPrice objects&lt;/span&gt;
+  &lt;span class=&quot;k&quot;&gt;val&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;socketStockStream&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;env&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;socketTextStream&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;s&quot;&gt;&amp;quot;localhost&amp;quot;&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;mi&quot;&gt;9999&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;).&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;map&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;x&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;=&amp;gt;&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;{&lt;/span&gt;
+    &lt;span class=&quot;k&quot;&gt;val&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;split&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;x&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;split&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;s&quot;&gt;&amp;quot;,&amp;quot;&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)&lt;/span&gt;
+    &lt;span class=&quot;nc&quot;&gt;StockPrice&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;split&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;mi&quot;&gt;0&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;),&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;split&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;mi&quot;&gt;1&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;).&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;toDouble&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)&lt;/span&gt;
+  &lt;span class=&quot;o&quot;&gt;})&lt;/span&gt;
+
+  &lt;span class=&quot;c1&quot;&gt;//Generate other stock streams&lt;/span&gt;
+  &lt;span class=&quot;k&quot;&gt;val&lt;/span&gt; &lt;span 
class=&quot;nc&quot;&gt;SPX_Stream&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;env&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;addSource&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;generateStock&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;s&quot;&gt;&amp;quot;SPX&amp;quot;&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)(&lt;/span&gt;&lt;span 
class=&quot;mi&quot;&gt;10&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;_&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)&lt;/span&gt;
+  &lt;span class=&quot;k&quot;&gt;val&lt;/span&gt; &lt;span 
class=&quot;nc&quot;&gt;FTSE_Stream&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;env&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;addSource&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;generateStock&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;s&quot;&gt;&amp;quot;FTSE&amp;quot;&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)(&lt;/span&gt;&lt;span 
class=&quot;mi&quot;&gt;20&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;_&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)&lt;/span&gt;
+  &lt;span class=&quot;k&quot;&gt;val&lt;/span&gt; &lt;span 
class=&quot;nc&quot;&gt;DJI_Stream&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;env&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;addSource&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;generateStock&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;s&quot;&gt;&amp;quot;DJI&amp;quot;&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)(&lt;/span&gt;&lt;span 
class=&quot;mi&quot;&gt;30&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;_&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)&lt;/span&gt;
+  &lt;span class=&quot;k&quot;&gt;val&lt;/span&gt; &lt;span 
class=&quot;nc&quot;&gt;BUX_Stream&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;env&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;addSource&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;generateStock&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;s&quot;&gt;&amp;quot;BUX&amp;quot;&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)(&lt;/span&gt;&lt;span 
class=&quot;mi&quot;&gt;40&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;_&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)&lt;/span&gt;
+
+  &lt;span class=&quot;c1&quot;&gt;//Merge all stock streams 
together&lt;/span&gt;
+  &lt;span class=&quot;k&quot;&gt;val&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;stockStream&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;socketStockStream&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;merge&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;nc&quot;&gt;SPX_Stream&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;nc&quot;&gt;FTSE_Stream&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; 
+    &lt;span class=&quot;nc&quot;&gt;DJI_Stream&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;nc&quot;&gt;BUX_Stream&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)&lt;/span&gt;
+
+  &lt;span class=&quot;n&quot;&gt;stockStream&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;print&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;()&lt;/span&gt;
+
+  &lt;span class=&quot;n&quot;&gt;env&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;execute&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;s&quot;&gt;&amp;quot;Stock stream&amp;quot;&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)&lt;/span&gt;
+&lt;span 
class=&quot;o&quot;&gt;}&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;
+
+&lt;/div&gt;
+
+&lt;div data-lang=&quot;java7&quot; markdown=&quot;1&quot;&gt;
+
+&lt;div class=&quot;highlight&quot;&gt;&lt;pre&gt;&lt;code 
class=&quot;language-java&quot; data-lang=&quot;java&quot;&gt;&lt;span 
class=&quot;kd&quot;&gt;public&lt;/span&gt; &lt;span 
class=&quot;kd&quot;&gt;static&lt;/span&gt; &lt;span 
class=&quot;kt&quot;&gt;void&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;main&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;String&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;[]&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;args&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)&lt;/span&gt; &lt;span 
class=&quot;kd&quot;&gt;throws&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;Exception&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;{&lt;/span&gt;
+
+    &lt;span class=&quot;kd&quot;&gt;final&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;StreamExecutionEnvironment&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;env&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt;
+        &lt;span 
class=&quot;n&quot;&gt;StreamExecutionEnvironment&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;getExecutionEnvironment&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;();&lt;/span&gt;
+
+    &lt;span class=&quot;c1&quot;&gt;//Read from a socket stream at map it to 
StockPrice objects&lt;/span&gt;
+    &lt;span class=&quot;n&quot;&gt;DataStream&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;StockPrice&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;socketStockStream&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;env&lt;/span&gt;
+            &lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;socketTextStream&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;s&quot;&gt;&amp;quot;localhost&amp;quot;&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;mi&quot;&gt;9999&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)&lt;/span&gt;
+            &lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;map&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;MapFunction&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;String&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;StockPrice&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;()&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;{&lt;/span&gt;
+                &lt;span class=&quot;kd&quot;&gt;private&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;String&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;[]&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;tokens&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;;&lt;/span&gt;
+
+                &lt;span class=&quot;nd&quot;&gt;@Override&lt;/span&gt;
+                &lt;span class=&quot;kd&quot;&gt;public&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;StockPrice&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;map&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;String&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;value&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)&lt;/span&gt; &lt;span 
class=&quot;kd&quot;&gt;throws&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;Exception&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;{&lt;/span&gt;
+                    &lt;span class=&quot;n&quot;&gt;tokens&lt;/span&gt; 
&lt;span class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;value&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;split&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;s&quot;&gt;&amp;quot;,&amp;quot;&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;);&lt;/span&gt;
+                    &lt;span class=&quot;k&quot;&gt;return&lt;/span&gt; 
&lt;span class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;StockPrice&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;tokens&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;[&lt;/span&gt;&lt;span 
class=&quot;mi&quot;&gt;0&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;],&lt;/span&gt;
+                        &lt;span 
class=&quot;n&quot;&gt;Double&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;parseDouble&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;tokens&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;[&lt;/span&gt;&lt;span 
class=&quot;mi&quot;&gt;1&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;]));&lt;/span&gt;
+                &lt;span class=&quot;o&quot;&gt;}&lt;/span&gt;
+            &lt;span class=&quot;o&quot;&gt;});&lt;/span&gt;
+
+    &lt;span class=&quot;c1&quot;&gt;//Generate other stock 
streams&lt;/span&gt;
+    &lt;span class=&quot;n&quot;&gt;DataStream&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;StockPrice&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;SPX_stream&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;env&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;addSource&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;StockSource&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;s&quot;&gt;&amp;quot;SPX&amp;quot;&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;mi&quot;&gt;10&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;));&lt;/span&gt;
+    &lt;span class=&quot;n&quot;&gt;DataStream&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;StockPrice&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;FTSE_stream&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;env&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;addSource&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;StockSource&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;s&quot;&gt;&amp;quot;FTSE&amp;quot;&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;mi&quot;&gt;20&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;));&lt;/span&gt;
+    &lt;span class=&quot;n&quot;&gt;DataStream&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;StockPrice&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;DJI_stream&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;env&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;addSource&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;StockSource&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;s&quot;&gt;&amp;quot;DJI&amp;quot;&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;mi&quot;&gt;30&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;));&lt;/span&gt;
+    &lt;span class=&quot;n&quot;&gt;DataStream&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;StockPrice&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;BUX_stream&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;env&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;addSource&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;StockSource&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;s&quot;&gt;&amp;quot;BUX&amp;quot;&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;mi&quot;&gt;40&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;));&lt;/span&gt;
+
+    &lt;span class=&quot;c1&quot;&gt;//Merge all stock streams 
together&lt;/span&gt;
+    &lt;span class=&quot;n&quot;&gt;DataStream&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;StockPrice&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;stockStream&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;socketStockStream&lt;/span&gt;
+        &lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;merge&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;SPX_stream&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;FTSE_stream&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;DJI_stream&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;BUX_stream&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;);&lt;/span&gt;
+
+    &lt;span class=&quot;n&quot;&gt;stockStream&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;print&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;();&lt;/span&gt;
+
+    &lt;span class=&quot;n&quot;&gt;env&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;execute&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;s&quot;&gt;&amp;quot;Stock stream&amp;quot;&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;);&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;
+ 
+&lt;/div&gt;
+
+&lt;p&gt;&lt;/div&gt;&lt;/p&gt;
+
+&lt;p&gt;See
+&lt;a 
href=&quot;http://flink.apache.org/docs/latest/streaming_guide.html#sources&quot;&gt;here&lt;/a&gt;
+on how you can create streaming sources for Flink Streaming
+programs. Flink, of course, has support for reading in streams from
+&lt;a 
href=&quot;http://flink.apache.org/docs/latest/streaming_guide.html#stream-connectors&quot;&gt;external
+sources&lt;/a&gt;
+such as Apache Kafka, Apache Flume, RabbitMQ, and others. For the sake
+of this example, the data streams are simply generated using the
+&lt;code&gt;generateStock&lt;/code&gt; method:&lt;/p&gt;
+
+&lt;div class=&quot;codetabs&quot; markdown=&quot;1&quot;&gt;
+&lt;div data-lang=&quot;scala&quot; markdown=&quot;1&quot;&gt;
+
+&lt;div class=&quot;highlight&quot;&gt;&lt;pre&gt;&lt;code 
class=&quot;language-scala&quot; data-lang=&quot;scala&quot;&gt;&lt;span 
class=&quot;k&quot;&gt;val&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;symbols&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;nc&quot;&gt;List&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;s&quot;&gt;&amp;quot;SPX&amp;quot;&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;s&quot;&gt;&amp;quot;FTSE&amp;quot;&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;s&quot;&gt;&amp;quot;DJI&amp;quot;&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;s&quot;&gt;&amp;quot;DJT&amp;quot;&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;s&quot;&gt;&amp;quot;BUX&amp;quot;&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;s&quot;&gt;&amp;quot;DAX&amp;quo
 t;&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;s&quot;&gt;&amp;quot;GOOG&amp;quot;&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)&lt;/span&gt;
+
+&lt;span class=&quot;k&quot;&gt;case&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;class&lt;/span&gt; &lt;span 
class=&quot;nc&quot;&gt;StockPrice&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;symbol&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;:&lt;/span&gt; &lt;span 
class=&quot;kt&quot;&gt;String&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;price&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;:&lt;/span&gt; &lt;span 
class=&quot;kt&quot;&gt;Double&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)&lt;/span&gt;
+
+&lt;span class=&quot;k&quot;&gt;def&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;generateStock&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;symbol&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;:&lt;/span&gt; &lt;span 
class=&quot;kt&quot;&gt;String&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;sigma&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;:&lt;/span&gt; &lt;span 
class=&quot;kt&quot;&gt;Int&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;out&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;:&lt;/span&gt; &lt;span 
class=&quot;kt&quot;&gt;Collector&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;[&lt;/span&gt;&lt;span 
class=&quot;kt&quot;&gt;StockPrice&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;])&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;{&lt;/span&gt;
+  &lt;span class=&quot;k&quot;&gt;var&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;price&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;mf&quot;&gt;1000.&lt;/span&gt;
+  &lt;span class=&quot;k&quot;&gt;while&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;kc&quot;&gt;true&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;{&lt;/span&gt;
+    &lt;span class=&quot;n&quot;&gt;price&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;price&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;+&lt;/span&gt; &lt;span 
class=&quot;nc&quot;&gt;Random&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;nextGaussian&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;*&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;sigma&lt;/span&gt;
+    &lt;span class=&quot;n&quot;&gt;out&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;collect&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;nc&quot;&gt;StockPrice&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;symbol&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;price&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;))&lt;/span&gt;
+    &lt;span class=&quot;nc&quot;&gt;Thread&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;sleep&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;nc&quot;&gt;Random&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;nextInt&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;mi&quot;&gt;200&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;))&lt;/span&gt;
+  &lt;span class=&quot;o&quot;&gt;}&lt;/span&gt;
+&lt;span 
class=&quot;o&quot;&gt;}&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;
+
+&lt;/div&gt;
+
+&lt;div data-lang=&quot;java7&quot; markdown=&quot;1&quot;&gt;
+
+&lt;div class=&quot;highlight&quot;&gt;&lt;pre&gt;&lt;code 
class=&quot;language-java&quot; data-lang=&quot;java&quot;&gt;&lt;span 
class=&quot;kd&quot;&gt;private&lt;/span&gt; &lt;span 
class=&quot;kd&quot;&gt;static&lt;/span&gt; &lt;span 
class=&quot;kd&quot;&gt;final&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;ArrayList&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;

<TRUNCATED>

Reply via email to