http://git-wip-us.apache.org/repos/asf/flink-web/blob/01ee1809/content/blog/feed.xml
----------------------------------------------------------------------
diff --git a/content/blog/feed.xml b/content/blog/feed.xml
index 627f2fd..5efaf14 100644
--- a/content/blog/feed.xml
+++ b/content/blog/feed.xml
@@ -8,7 +8,7 @@
 
 <item>
 <title>April 2015 in the Flink community</title>
-<description>&lt;p&gt;April was an packed month for Apache Flink.&lt;/p&gt;
+<description>&lt;p&gt;April was an packed month for Apache Flink. &lt;/p&gt;
 
 &lt;h2 id=&quot;flink-090-milestone1-release&quot;&gt;Flink 0.9.0-milestone1 
release&lt;/h2&gt;
 
@@ -24,7 +24,7 @@
 
 &lt;h2 id=&quot;flink-on-the-web&quot;&gt;Flink on the web&lt;/h2&gt;
 
-&lt;p&gt;Fabian Hueske gave an &lt;a 
href=&quot;http://www.infoq.com/news/2015/04/hueske-apache-flink?utm_campaign=infoq_content&amp;amp;utm_source=infoq&amp;amp;utm_medium=feed&amp;amp;utm_term=global&quot;&gt;interview
 at InfoQ&lt;/a&gt; on Apache Flink.&lt;/p&gt;
+&lt;p&gt;Fabian Hueske gave an &lt;a 
href=&quot;http://www.infoq.com/news/2015/04/hueske-apache-flink?utm_campaign=infoq_content&amp;amp;utm_source=infoq&amp;amp;utm_medium=feed&amp;amp;utm_term=global&quot;&gt;interview
 at InfoQ&lt;/a&gt; on Apache Flink. &lt;/p&gt;
 
 &lt;h2 id=&quot;upcoming-events&quot;&gt;Upcoming events&lt;/h2&gt;
 
@@ -56,7 +56,7 @@ However, this approach has a few notable drawbacks. First of 
all it is not trivi
 &lt;img src=&quot;/img/blog/memory-mgmt.png&quot; 
style=&quot;width:90%;margin:15px&quot; /&gt;
 &lt;/center&gt;
 
-&lt;p&gt;Flink’s style of active memory management and operating on binary 
data has several benefits:&lt;/p&gt;
+&lt;p&gt;Flink’s style of active memory management and operating on binary 
data has several benefits: &lt;/p&gt;
 
 &lt;ol&gt;
   &lt;li&gt;&lt;strong&gt;Memory-safe execution &amp;amp; efficient 
out-of-core algorithms.&lt;/strong&gt; Due to the fixed amount of allocated 
memory segments, it is trivial to monitor remaining memory resources. In case 
of memory shortage, processing operators can efficiently write larger batches 
of memory segments to disk and later them read back. Consequently, 
&lt;code&gt;OutOfMemoryErrors&lt;/code&gt; are effectively prevented.&lt;/li&gt;
@@ -65,13 +65,13 @@ However, this approach has a few notable drawbacks. First 
of all it is not trivi
   &lt;li&gt;&lt;strong&gt;Efficient binary operations &amp;amp; cache 
sensitivity.&lt;/strong&gt; Binary data can be efficiently compared and 
operated on given a suitable binary representation. Furthermore, the binary 
representations can put related values, as well as hash codes, keys, and 
pointers, adjacently into memory. This gives data structures with usually more 
cache efficient access patterns.&lt;/li&gt;
 &lt;/ol&gt;
 
-&lt;p&gt;These properties of active memory management are very desirable in a 
data processing systems for large-scale data analytics but have a significant 
price tag attached. Active memory management and operating on binary data is 
not trivial to implement, i.e., using 
&lt;code&gt;java.util.HashMap&lt;/code&gt; is much easier than implementing a 
spillable hash-table backed by byte arrays and a custom serialization stack. Of 
course Apache Flink is not the only JVM-based data processing system that 
operates on serialized binary data. Projects such as &lt;a 
href=&quot;http://drill.apache.org/&quot;&gt;Apache Drill&lt;/a&gt;, &lt;a 
href=&quot;http://ignite.incubator.apache.org/&quot;&gt;Apache Ignite 
(incubating)&lt;/a&gt; or &lt;a 
href=&quot;http://projectgeode.org/&quot;&gt;Apache Geode 
(incubating)&lt;/a&gt; apply similar techniques and it was recently announced 
that also &lt;a href=&quot;http://spark.apache.org/&quot;&gt;Apache 
Spark&lt;/a&gt; will evolve into this direction with &
 lt;a 
href=&quot;https://databricks.com/blog/2015/04/28/project-tungsten-bringing-spark-closer-to-bare-metal.html&quot;&gt;Project
 Tungsten&lt;/a&gt;.&lt;/p&gt;
+&lt;p&gt;These properties of active memory management are very desirable in a 
data processing systems for large-scale data analytics but have a significant 
price tag attached. Active memory management and operating on binary data is 
not trivial to implement, i.e., using 
&lt;code&gt;java.util.HashMap&lt;/code&gt; is much easier than implementing a 
spillable hash-table backed by byte arrays and a custom serialization stack. Of 
course Apache Flink is not the only JVM-based data processing system that 
operates on serialized binary data. Projects such as &lt;a 
href=&quot;http://drill.apache.org/&quot;&gt;Apache Drill&lt;/a&gt;, &lt;a 
href=&quot;http://ignite.incubator.apache.org/&quot;&gt;Apache Ignite 
(incubating)&lt;/a&gt; or &lt;a 
href=&quot;http://projectgeode.org/&quot;&gt;Apache Geode 
(incubating)&lt;/a&gt; apply similar techniques and it was recently announced 
that also &lt;a href=&quot;http://spark.apache.org/&quot;&gt;Apache 
Spark&lt;/a&gt; will evolve into this direction with &
 lt;a 
href=&quot;https://databricks.com/blog/2015/04/28/project-tungsten-bringing-spark-closer-to-bare-metal.html&quot;&gt;Project
 Tungsten&lt;/a&gt;. &lt;/p&gt;
 
 &lt;p&gt;In the following we discuss in detail how Flink allocates memory, 
de/serializes objects, and operates on binary data. We will also show some 
performance numbers comparing processing objects on the heap and operating on 
binary data.&lt;/p&gt;
 
 &lt;h2 id=&quot;how-does-flink-allocate-memory&quot;&gt;How does Flink 
allocate memory?&lt;/h2&gt;
 
-&lt;p&gt;A Flink worker, called TaskManager, is composed of several internal 
components such as an actor system for coordination with the Flink master, an 
IOManager that takes care of spilling data to disk and reading it back, and a 
MemoryManager that coordinates memory usage. In the context of this blog post, 
the MemoryManager is of most interest.&lt;/p&gt;
+&lt;p&gt;A Flink worker, called TaskManager, is composed of several internal 
components such as an actor system for coordination with the Flink master, an 
IOManager that takes care of spilling data to disk and reading it back, and a 
MemoryManager that coordinates memory usage. In the context of this blog post, 
the MemoryManager is of most interest. &lt;/p&gt;
 
 &lt;p&gt;The MemoryManager takes care of allocating, accounting, and 
distributing MemorySegments to data processing operators such as sort and join 
operators. A &lt;a 
href=&quot;https://github.com/apache/flink/blob/release-0.9.0-milestone-1/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegment.java&quot;&gt;MemorySegment&lt;/a&gt;
 is Flink’s distribution unit of memory and is backed by a regular Java byte 
array (size is 32 KB by default). A MemorySegment provides very efficient write 
and read access to its backed byte array using Java’s unsafe methods. You can 
think of a MemorySegment as a custom-tailored version of Java’s NIO 
ByteBuffer. In order to operate on multiple MemorySegments like on a larger 
chunk of consecutive memory, Flink uses logical views that implement Java’s 
&lt;code&gt;java.io.DataOutput&lt;/code&gt; and 
&lt;code&gt;java.io.DataInput&lt;/code&gt; interfaces.&lt;/p&gt;
 
@@ -83,7 +83,7 @@ However, this approach has a few notable drawbacks. First of 
all it is not trivi
 
 &lt;h2 id=&quot;how-does-flink-serialize-objects&quot;&gt;How does Flink 
serialize objects?&lt;/h2&gt;
 
-&lt;p&gt;The Java ecosystem offers several libraries to convert objects into a 
binary representation and back. Common alternatives are standard Java 
serialization, &lt;a 
href=&quot;https://github.com/EsotericSoftware/kryo&quot;&gt;Kryo&lt;/a&gt;, 
&lt;a href=&quot;http://avro.apache.org/&quot;&gt;Apache Avro&lt;/a&gt;, &lt;a 
href=&quot;http://thrift.apache.org/&quot;&gt;Apache Thrift&lt;/a&gt;, or 
Google’s &lt;a 
href=&quot;https://github.com/google/protobuf&quot;&gt;Protobuf&lt;/a&gt;. 
Flink includes its own custom serialization framework in order to control the 
binary representation of data. This is important because operating on binary 
data such as comparing or even manipulating binary data requires exact 
knowledge of the serialization layout. Further, configuring the serialization 
layout with respect to operations that are performed on binary data can yield a 
significant performance boost. Flink’s serialization stack also leverages the 
fact, that the type of the objects which 
 are going through de/serialization are exactly known before a program is 
executed.&lt;/p&gt;
+&lt;p&gt;The Java ecosystem offers several libraries to convert objects into a 
binary representation and back. Common alternatives are standard Java 
serialization, &lt;a 
href=&quot;https://github.com/EsotericSoftware/kryo&quot;&gt;Kryo&lt;/a&gt;, 
&lt;a href=&quot;http://avro.apache.org/&quot;&gt;Apache Avro&lt;/a&gt;, &lt;a 
href=&quot;http://thrift.apache.org/&quot;&gt;Apache Thrift&lt;/a&gt;, or 
Google’s &lt;a 
href=&quot;https://github.com/google/protobuf&quot;&gt;Protobuf&lt;/a&gt;. 
Flink includes its own custom serialization framework in order to control the 
binary representation of data. This is important because operating on binary 
data such as comparing or even manipulating binary data requires exact 
knowledge of the serialization layout. Further, configuring the serialization 
layout with respect to operations that are performed on binary data can yield a 
significant performance boost. Flink’s serialization stack also leverages the 
fact, that the type of the objects which 
 are going through de/serialization are exactly known before a program is 
executed. &lt;/p&gt;
 
 &lt;p&gt;Flink programs can process data represented as arbitrary Java or 
Scala objects. Before a program is optimized, the data types at each processing 
step of the program’s data flow need to be identified. For Java programs, 
Flink features a reflection-based type extraction component to analyze the 
return types of user-defined functions. Scala programs are analyzed with help 
of the Scala compiler. Flink represents each data type with a &lt;a 
href=&quot;https://github.com/apache/flink/blob/release-0.9.0-milestone-1/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java&quot;&gt;TypeInformation&lt;/a&gt;.
 Flink has TypeInformations for several kinds of data types, 
including:&lt;/p&gt;
 
@@ -93,11 +93,11 @@ However, this approach has a few notable drawbacks. First 
of all it is not trivi
   &lt;li&gt;WritableTypeInfo: Any implementation of Hadoop’s Writable 
interface.&lt;/li&gt;
   &lt;li&gt;TupleTypeInfo: Any Flink tuple (Tuple1 to Tuple25). Flink tuples 
are Java representations for fixed-length tuples with typed fields.&lt;/li&gt;
   &lt;li&gt;CaseClassTypeInfo: Any Scala CaseClass (including Scala 
tuples).&lt;/li&gt;
-  &lt;li&gt;PojoTypeInfo: Any POJO (Java or Scala), i.e., an object with all 
fields either being public or accessible through getters and setter that follow 
the common naming conventions.&lt;/li&gt;
+  &lt;li&gt;PojoTypeInfo: Any POJO (Java or Scala), i.e., an object with all 
fields either being public or accessible through getters and setter that follow 
the common naming conventions. &lt;/li&gt;
   &lt;li&gt;GenericTypeInfo: Any data type that cannot be identified as 
another type.&lt;/li&gt;
 &lt;/ul&gt;
 
-&lt;p&gt;Each TypeInformation provides a serializer for the data type it 
represents. For example, a BasicTypeInfo returns a serializer that writes the 
respective primitive type, the serializer of a WritableTypeInfo delegates 
de/serialization to the write() and readFields() methods of the object 
implementing Hadoop’s Writable interface, and a GenericTypeInfo returns a 
serializer that delegates serialization to Kryo. Object serialization to a 
DataOutput which is backed by Flink MemorySegments goes automatically through 
Java’s efficient unsafe operations. For data types that can be used as keys, 
i.e., compared and hashed, the TypeInformation provides TypeComparators. 
TypeComparators compare and hash objects and can - depending on the concrete 
data type - also efficiently compare binary representations and extract 
fixed-length binary key prefixes.&lt;/p&gt;
+&lt;p&gt;Each TypeInformation provides a serializer for the data type it 
represents. For example, a BasicTypeInfo returns a serializer that writes the 
respective primitive type, the serializer of a WritableTypeInfo delegates 
de/serialization to the write() and readFields() methods of the object 
implementing Hadoop’s Writable interface, and a GenericTypeInfo returns a 
serializer that delegates serialization to Kryo. Object serialization to a 
DataOutput which is backed by Flink MemorySegments goes automatically through 
Java’s efficient unsafe operations. For data types that can be used as keys, 
i.e., compared and hashed, the TypeInformation provides TypeComparators. 
TypeComparators compare and hash objects and can - depending on the concrete 
data type - also efficiently compare binary representations and extract 
fixed-length binary key prefixes. &lt;/p&gt;
 
 &lt;p&gt;Tuple, Pojo, and CaseClass types are composite types, i.e., 
containers for one or more possibly nested data types. As such, their 
serializers and comparators are also composite and delegate the serialization 
and comparison of their member data types to the respective serializers and 
comparators. The following figure illustrates the serialization of a (nested) 
&lt;code&gt;Tuple3&amp;lt;Integer, Double, Person&amp;gt;&lt;/code&gt; object 
where &lt;code&gt;Person&lt;/code&gt; is a POJO and defined as 
follows:&lt;/p&gt;
 
@@ -110,13 +110,13 @@ However, this approach has a few notable drawbacks. First 
of all it is not trivi
 &lt;img src=&quot;/img/blog/data-serialization.png&quot; 
style=&quot;width:80%;margin:15px&quot; /&gt;
 &lt;/center&gt;
 
-&lt;p&gt;Flink’s type system can be easily extended by providing custom 
TypeInformations, Serializers, and Comparators to improve the performance of 
serializing and comparing custom data types.&lt;/p&gt;
+&lt;p&gt;Flink’s type system can be easily extended by providing custom 
TypeInformations, Serializers, and Comparators to improve the performance of 
serializing and comparing custom data types. &lt;/p&gt;
 
 &lt;h2 id=&quot;how-does-flink-operate-on-binary-data&quot;&gt;How does Flink 
operate on binary data?&lt;/h2&gt;
 
 &lt;p&gt;Similar to many other data processing APIs (including SQL), Flink’s 
APIs provide transformations to group, sort, and join data sets. These 
transformations operate on potentially very large data sets. Relational 
database systems feature very efficient algorithms for these purposes since 
several decades including external merge-sort, merge-join, and hybrid 
hash-join. Flink builds on this technology, but generalizes it to handle 
arbitrary objects using its custom serialization and comparison stack. In the 
following, we show how Flink operates with binary data by the example of 
Flink’s in-memory sort algorithm.&lt;/p&gt;
 
-&lt;p&gt;Flink assigns a memory budget to its data processing operators. Upon 
initialization, a sort algorithm requests its memory budget from the 
MemoryManager and receives a corresponding set of MemorySegments. The set of 
MemorySegments becomes the memory pool of a so-called sort buffer which 
collects the data that is be sorted. The following figure illustrates how data 
objects are serialized into the sort buffer.&lt;/p&gt;
+&lt;p&gt;Flink assigns a memory budget to its data processing operators. Upon 
initialization, a sort algorithm requests its memory budget from the 
MemoryManager and receives a corresponding set of MemorySegments. The set of 
MemorySegments becomes the memory pool of a so-called sort buffer which 
collects the data that is be sorted. The following figure illustrates how data 
objects are serialized into the sort buffer. &lt;/p&gt;
 
 &lt;center&gt;
 &lt;img src=&quot;/img/blog/sorting-binary-data-1.png&quot; 
style=&quot;width:90%;margin:15px&quot; /&gt;
@@ -129,7 +129,7 @@ The following figure shows how two objects are 
compared.&lt;/p&gt;
 &lt;img src=&quot;/img/blog/sorting-binary-data-2.png&quot; 
style=&quot;width:80%;margin:15px&quot; /&gt;
 &lt;/center&gt;
 
-&lt;p&gt;The sort buffer compares two elements by comparing their binary 
fix-length sort keys. The comparison is successful if either done on a full key 
(not a prefix key) or if the binary prefix keys are not equal. If the prefix 
keys are equal (or the sort key data type does not provide a binary prefix 
key), the sort buffer follows the pointers to the actual object data, 
deserializes both objects and compares the objects. Depending on the result of 
the comparison, the sort algorithm decides whether to swap the compared 
elements or not. The sort buffer swaps two elements by moving their fix-length 
keys and pointers. The actual data is not moved. Once the sort algorithm 
finishes, the pointers in the sort buffer are correctly ordered. The following 
figure shows how the sorted data is returned from the sort buffer.&lt;/p&gt;
+&lt;p&gt;The sort buffer compares two elements by comparing their binary 
fix-length sort keys. The comparison is successful if either done on a full key 
(not a prefix key) or if the binary prefix keys are not equal. If the prefix 
keys are equal (or the sort key data type does not provide a binary prefix 
key), the sort buffer follows the pointers to the actual object data, 
deserializes both objects and compares the objects. Depending on the result of 
the comparison, the sort algorithm decides whether to swap the compared 
elements or not. The sort buffer swaps two elements by moving their fix-length 
keys and pointers. The actual data is not moved. Once the sort algorithm 
finishes, the pointers in the sort buffer are correctly ordered. The following 
figure shows how the sorted data is returned from the sort buffer. &lt;/p&gt;
 
 &lt;center&gt;
 &lt;img src=&quot;/img/blog/sorting-binary-data-3.png&quot; 
style=&quot;width:80%;margin:15px&quot; /&gt;
@@ -147,7 +147,7 @@ The following figure shows how two objects are 
compared.&lt;/p&gt;
   &lt;li&gt;&lt;strong&gt;Kryo-serialized.&lt;/strong&gt; The tuple fields are 
serialized into a sort buffer of 600 MB size using Kryo serialization and 
sorted without binary sort keys. This means that each pair-wise comparison 
requires two object to be deserialized.&lt;/li&gt;
 &lt;/ol&gt;
 
-&lt;p&gt;All sort methods are implemented using a single thread. The reported 
times are averaged over ten runs. After each run, we call 
&lt;code&gt;System.gc()&lt;/code&gt; to request a garbage collection run which 
does not go into measured execution time. The following figure shows the time 
to store the input data in memory, sort it, and read it back as 
objects.&lt;/p&gt;
+&lt;p&gt;All sort methods are implemented using a single thread. The reported 
times are averaged over ten runs. After each run, we call 
&lt;code&gt;System.gc()&lt;/code&gt; to request a garbage collection run which 
does not go into measured execution time. The following figure shows the time 
to store the input data in memory, sort it, and read it back as objects. 
&lt;/p&gt;
 
 &lt;center&gt;
 &lt;img src=&quot;/img/blog/sort-benchmark.png&quot; 
style=&quot;width:90%;margin:15px&quot; /&gt;
@@ -205,13 +205,13 @@ The following figure shows how two objects are 
compared.&lt;/p&gt;
 
 &lt;p&gt;&lt;br /&gt;&lt;/p&gt;
 
-&lt;p&gt;To summarize, the experiments verify the previously stated benefits 
of operating on binary data.&lt;/p&gt;
+&lt;p&gt;To summarize, the experiments verify the previously stated benefits 
of operating on binary data. &lt;/p&gt;
 
 &lt;h2 id=&quot;were-not-done-yet&quot;&gt;We’re not done yet!&lt;/h2&gt;
 
-&lt;p&gt;Apache Flink features quite a bit of advanced techniques to safely 
and efficiently process huge amounts of data with limited memory resources. 
However, there are a few points that could make Flink even more efficient. The 
Flink community is working on moving the managed memory to off-heap memory. 
This will allow for smaller JVMs, lower garbage collection overhead, and also 
easier system configuration. With Flink’s Table API, the semantics of all 
operations such as aggregations and projections are known (in contrast to 
black-box user-defined functions). Hence we can generate code for Table API 
operations that directly operates on binary data. Further improvements include 
serialization layouts which are tailored towards the operations that are 
applied on the binary data and code generation for serializers and 
comparators.&lt;/p&gt;
+&lt;p&gt;Apache Flink features quite a bit of advanced techniques to safely 
and efficiently process huge amounts of data with limited memory resources. 
However, there are a few points that could make Flink even more efficient. The 
Flink community is working on moving the managed memory to off-heap memory. 
This will allow for smaller JVMs, lower garbage collection overhead, and also 
easier system configuration. With Flink’s Table API, the semantics of all 
operations such as aggregations and projections are known (in contrast to 
black-box user-defined functions). Hence we can generate code for Table API 
operations that directly operates on binary data. Further improvements include 
serialization layouts which are tailored towards the operations that are 
applied on the binary data and code generation for serializers and comparators. 
&lt;/p&gt;
 
-&lt;p&gt;The groundwork (and a lot more) for operating on binary data is done 
but there is still some room for making Flink even better and faster. If you 
are crazy about performance and like to juggle with lot of bits and bytes, join 
the Flink community!&lt;/p&gt;
+&lt;p&gt;The groundwork (and a lot more) for operating on binary data is done 
but there is still some room for making Flink even better and faster. If you 
are crazy about performance and like to juggle with lot of bits and bytes, join 
the Flink community! &lt;/p&gt;
 
 &lt;h2 id=&quot;tldr-give-me-three-things-to-remember&quot;&gt;TL;DR; Give me 
three things to remember!&lt;/h2&gt;
 
@@ -565,7 +565,7 @@ Tez as an execution backend instead of Flink’s own 
network stack. Learn more
 &lt;p&gt;In this blog post, we cut through Apache Flink’s layered 
architecture and take a look at its internals with a focus on how it handles 
joins. Specifically, I will&lt;/p&gt;
 
 &lt;ul&gt;
-  &lt;li&gt;show how easy it is to join data sets using Flink’s fluent 
APIs,&lt;/li&gt;
+  &lt;li&gt;show how easy it is to join data sets using Flink’s fluent APIs, 
&lt;/li&gt;
   &lt;li&gt;discuss basic distributed join strategies, Flink’s join 
implementations, and its memory management,&lt;/li&gt;
   &lt;li&gt;talk about Flink’s optimizer that automatically chooses join 
strategies,&lt;/li&gt;
   &lt;li&gt;show some performance numbers for joining data sets of different 
sizes, and finally&lt;/li&gt;
@@ -576,7 +576,7 @@ Tez as an execution backend instead of Flink’s own 
network stack. Learn more
 
 &lt;h3 id=&quot;how-do-i-join-with-flink&quot;&gt;How do I join with 
Flink?&lt;/h3&gt;
 
-&lt;p&gt;Flink provides fluent APIs in Java and Scala to write data flow 
programs. Flink’s APIs are centered around parallel data collections which 
are called data sets. data sets are processed by applying Transformations that 
compute new data sets. Flink’s transformations include Map and Reduce as 
known from MapReduce &lt;a 
href=&quot;http://research.google.com/archive/mapreduce.html&quot;&gt;[1]&lt;/a&gt;
 but also operators for joining, co-grouping, and iterative processing. The 
documentation gives an overview of all available transformations &lt;a 
href=&quot;http://ci.apache.org/projects/flink/flink-docs-release-0.8/dataset_transformations.html&quot;&gt;[2]&lt;/a&gt;.&lt;/p&gt;
+&lt;p&gt;Flink provides fluent APIs in Java and Scala to write data flow 
programs. Flink’s APIs are centered around parallel data collections which 
are called data sets. data sets are processed by applying Transformations that 
compute new data sets. Flink’s transformations include Map and Reduce as 
known from MapReduce &lt;a 
href=&quot;http://research.google.com/archive/mapreduce.html&quot;&gt;[1]&lt;/a&gt;
 but also operators for joining, co-grouping, and iterative processing. The 
documentation gives an overview of all available transformations &lt;a 
href=&quot;http://ci.apache.org/projects/flink/flink-docs-release-0.8/dataset_transformations.html&quot;&gt;[2]&lt;/a&gt;.
 &lt;/p&gt;
 
 &lt;p&gt;Joining two Scala case class data sets is very easy as the following 
example shows:&lt;/p&gt;
 
@@ -613,7 +613,7 @@ Tez as an execution backend instead of Flink’s own 
network stack. Learn more
 
 &lt;ol&gt;
   &lt;li&gt;The data of both inputs is distributed across all parallel 
instances that participate in the join and&lt;/li&gt;
-  &lt;li&gt;each parallel instance performs a standard stand-alone join 
algorithm on its local partition of the overall data.&lt;/li&gt;
+  &lt;li&gt;each parallel instance performs a standard stand-alone join 
algorithm on its local partition of the overall data. &lt;/li&gt;
 &lt;/ol&gt;
 
 &lt;p&gt;The distribution of data across parallel instances must ensure that 
each valid join pair can be locally built by exactly one instance. For both 
steps, there are multiple valid strategies that can be independently picked and 
which are favorable in different situations. In Flink terminology, the first 
phase is called Ship Strategy and the second phase Local Strategy. In the 
following I will describe Flink’s ship and local strategies to join two data 
sets &lt;em&gt;R&lt;/em&gt; and &lt;em&gt;S&lt;/em&gt;.&lt;/p&gt;
@@ -632,7 +632,7 @@ Tez as an execution backend instead of Flink’s own 
network stack. Learn more
 &lt;img src=&quot;/img/blog/joins-repartition.png&quot; 
style=&quot;width:90%;margin:15px&quot; /&gt;
 &lt;/center&gt;
 
-&lt;p&gt;The Broadcast-Forward strategy sends one complete data set (R) to 
each parallel instance that holds a partition of the other data set (S), i.e., 
each parallel instance receives the full data set R. Data set S remains local 
and is not shipped at all. The cost of the BF strategy depends on the size of R 
and the number of parallel instances it is shipped to. The size of S does not 
matter because S is not moved. The figure below illustrates how both ship 
strategies work.&lt;/p&gt;
+&lt;p&gt;The Broadcast-Forward strategy sends one complete data set (R) to 
each parallel instance that holds a partition of the other data set (S), i.e., 
each parallel instance receives the full data set R. Data set S remains local 
and is not shipped at all. The cost of the BF strategy depends on the size of R 
and the number of parallel instances it is shipped to. The size of S does not 
matter because S is not moved. The figure below illustrates how both ship 
strategies work. &lt;/p&gt;
 
 &lt;center&gt;
 &lt;img src=&quot;/img/blog/joins-broadcast.png&quot; 
style=&quot;width:90%;margin:15px&quot; /&gt;
@@ -641,7 +641,7 @@ Tez as an execution backend instead of Flink’s own 
network stack. Learn more
 &lt;p&gt;The Repartition-Repartition and Broadcast-Forward ship strategies 
establish suitable data distributions to execute a distributed join. Depending 
on the operations that are applied before the join, one or even both inputs of 
a join are already distributed in a suitable way across parallel instances. In 
this case, Flink will reuse such distributions and only ship one or no input at 
all.&lt;/p&gt;
 
 &lt;h4 id=&quot;flinks-memory-management&quot;&gt;Flink’s Memory 
Management&lt;/h4&gt;
-&lt;p&gt;Before delving into the details of Flink’s local join algorithms, I 
will briefly discuss Flink’s internal memory management. Data processing 
algorithms such as joining, grouping, and sorting need to hold portions of 
their input data in memory. While such algorithms perform best if there is 
enough memory available to hold all data, it is crucial to gracefully handle 
situations where the data size exceeds memory. Such situations are especially 
tricky in JVM-based systems such as Flink because the system needs to reliably 
recognize that it is short on memory. Failure to detect such situations can 
result in an &lt;code&gt;OutOfMemoryException&lt;/code&gt; and kill the 
JVM.&lt;/p&gt;
+&lt;p&gt;Before delving into the details of Flink’s local join algorithms, I 
will briefly discuss Flink’s internal memory management. Data processing 
algorithms such as joining, grouping, and sorting need to hold portions of 
their input data in memory. While such algorithms perform best if there is 
enough memory available to hold all data, it is crucial to gracefully handle 
situations where the data size exceeds memory. Such situations are especially 
tricky in JVM-based systems such as Flink because the system needs to reliably 
recognize that it is short on memory. Failure to detect such situations can 
result in an &lt;code&gt;OutOfMemoryException&lt;/code&gt; and kill the JVM. 
&lt;/p&gt;
 
 &lt;p&gt;Flink handles this challenge by actively managing its memory. When a 
worker node (TaskManager) is started, it allocates a fixed portion (70% by 
default) of the JVM’s heap memory that is available after initialization as 
32KB byte arrays. These byte arrays are distributed as working memory to all 
algorithms that need to hold significant portions of data in memory. The 
algorithms receive their input data as Java data objects and serialize them 
into their working memory.&lt;/p&gt;
 
@@ -658,7 +658,7 @@ Tez as an execution backend instead of Flink’s own 
network stack. Learn more
 &lt;p&gt;After the data has been distributed across all parallel join 
instances using either a Repartition-Repartition or Broadcast-Forward ship 
strategy, each instance runs a local join algorithm to join the elements of its 
local partition. Flink’s runtime features two common join strategies to 
perform these local joins:&lt;/p&gt;
 
 &lt;ul&gt;
-  &lt;li&gt;the &lt;em&gt;Sort-Merge-Join&lt;/em&gt; strategy (SM) 
and&lt;/li&gt;
+  &lt;li&gt;the &lt;em&gt;Sort-Merge-Join&lt;/em&gt; strategy (SM) and 
&lt;/li&gt;
   &lt;li&gt;the &lt;em&gt;Hybrid-Hash-Join&lt;/em&gt; strategy (HH).&lt;/li&gt;
 &lt;/ul&gt;
 
@@ -703,13 +703,13 @@ Tez as an execution backend instead of Flink’s own 
network stack. Learn more
 &lt;ul&gt;
   &lt;li&gt;1GB     : 1000GB&lt;/li&gt;
   &lt;li&gt;10GB    : 1000GB&lt;/li&gt;
-  &lt;li&gt;100GB   : 1000GB&lt;/li&gt;
+  &lt;li&gt;100GB   : 1000GB &lt;/li&gt;
   &lt;li&gt;1000GB  : 1000GB&lt;/li&gt;
 &lt;/ul&gt;
 
 &lt;p&gt;The Broadcast-Forward strategy is only executed for up to 10GB. 
Building a hash table from 100GB broadcasted data in 5GB working memory would 
result in spilling proximately 95GB (build input) + 950GB (probe input) in each 
parallel thread and require more than 8TB local disk storage on each 
machine.&lt;/p&gt;
 
-&lt;p&gt;As in the single-core benchmark, we run 1:N joins, generate the data 
on-the-fly, and immediately discard the result after the join. We run the 
benchmark on 10 n1-highmem-8 Google Compute Engine instances. Each instance is 
equipped with 8 cores, 52GB RAM, 40GB of which are configured as working memory 
(5GB per core), and one local SSD for spilling to disk. All benchmarks are 
performed using the same configuration, i.e., no fine tuning for the respective 
data sizes is done. The programs are executed with a parallelism of 
80.&lt;/p&gt;
+&lt;p&gt;As in the single-core benchmark, we run 1:N joins, generate the data 
on-the-fly, and immediately discard the result after the join. We run the 
benchmark on 10 n1-highmem-8 Google Compute Engine instances. Each instance is 
equipped with 8 cores, 52GB RAM, 40GB of which are configured as working memory 
(5GB per core), and one local SSD for spilling to disk. All benchmarks are 
performed using the same configuration, i.e., no fine tuning for the respective 
data sizes is done. The programs are executed with a parallelism of 80. 
&lt;/p&gt;
 
 &lt;center&gt;
 &lt;img src=&quot;/img/blog/joins-dist-perf.png&quot; 
style=&quot;width:70%;margin:15px&quot; /&gt;
@@ -726,7 +726,7 @@ Tez as an execution backend instead of Flink’s own 
network stack. Learn more
 &lt;ul&gt;
   &lt;li&gt;Flink’s fluent Scala and Java APIs make joins and other data 
transformations easy as cake.&lt;/li&gt;
   &lt;li&gt;The optimizer does the hard choices for you, but gives you control 
in case you know better.&lt;/li&gt;
-  &lt;li&gt;Flink’s join implementations perform very good in-memory and 
gracefully degrade when going to disk.&lt;/li&gt;
+  &lt;li&gt;Flink’s join implementations perform very good in-memory and 
gracefully degrade when going to disk. &lt;/li&gt;
   &lt;li&gt;Due to Flink’s robust memory management, there is no need for 
job- or data-specific memory tuning to avoid a nasty 
&lt;code&gt;OutOfMemoryException&lt;/code&gt;. It just runs 
out-of-the-box.&lt;/li&gt;
 &lt;/ul&gt;
 
@@ -883,7 +883,7 @@ between the market data streams and a Twitter stream with 
stock mentions.&lt;/p&
 
 &lt;p&gt;For running the example implementation please use the 
&lt;em&gt;0.9-SNAPSHOT&lt;/em&gt; 
 version of Flink as a dependency. The full example code base can be 
-found &lt;a 
href=&quot;https://github.com/apache/flink/blob/master/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala&quot;&gt;here&lt;/a&gt;
 in Scala and &lt;a 
href=&quot;https://github.com/apache/flink/blob/master/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/StockPrices.java&quot;&gt;here&lt;/a&gt;
 in Java7.&lt;/p&gt;
+found &lt;a 
href=&quot;https://github.com/mbalassi/flink/blob/stockprices/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala&quot;&gt;here&lt;/a&gt;
 in Scala and &lt;a 
href=&quot;https://github.com/mbalassi/flink/blob/stockprices/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/StockPrices.java&quot;&gt;here&lt;/a&gt;
 in Java7.&lt;/p&gt;
 
 &lt;p&gt;&lt;a href=&quot;#top&quot;&gt;&lt;/a&gt;&lt;/p&gt;
 
@@ -897,7 +897,7 @@ found &lt;a 
href=&quot;https://github.com/apache/flink/blob/master/flink-staging
   &lt;li&gt;Read a socket stream of stock prices&lt;/li&gt;
   &lt;li&gt;Parse the text in the stream to create a stream of 
&lt;code&gt;StockPrice&lt;/code&gt; objects&lt;/li&gt;
   &lt;li&gt;Add four other sources tagged with the stock symbol.&lt;/li&gt;
-  &lt;li&gt;Finally, merge the streams to create a unified stream.&lt;/li&gt;
+  &lt;li&gt;Finally, merge the streams to create a unified stream. &lt;/li&gt;
 &lt;/ol&gt;
 
 &lt;p&gt;&lt;img alt=&quot;Reading from multiple inputs&quot; 
src=&quot;/img/blog/blog_multi_input.png&quot; width=&quot;70%&quot; 
class=&quot;img-responsive center-block&quot; /&gt;&lt;/p&gt;
@@ -953,10 +953,10 @@ found &lt;a 
href=&quot;https://github.com/apache/flink/blob/master/flink-staging
             &lt;span class=&quot;o&quot;&gt;});&lt;/span&gt;
 
     &lt;span class=&quot;c1&quot;&gt;//Generate other stock 
streams&lt;/span&gt;
-    &lt;span class=&quot;n&quot;&gt;DataStream&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;StockPrice&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;SPX_stream&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;env&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;addSource&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;StockSource&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;s&quot;&gt;&amp;quot;SPX&amp;quot;&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;mi&quot;&gt;10&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;));&lt;/span&gt;
-    &lt;span class=&quot;n&quot;&gt;DataStream&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;StockPrice&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;FTSE_stream&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;env&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;addSource&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;StockSource&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;s&quot;&gt;&amp;quot;FTSE&amp;quot;&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;mi&quot;&gt;20&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;));&lt;/span&gt;
-    &lt;span class=&quot;n&quot;&gt;DataStream&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;StockPrice&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;DJI_stream&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;env&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;addSource&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;StockSource&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;s&quot;&gt;&amp;quot;DJI&amp;quot;&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;mi&quot;&gt;30&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;));&lt;/span&gt;
-    &lt;span class=&quot;n&quot;&gt;DataStream&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;StockPrice&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;BUX_stream&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;env&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;addSource&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;StockSource&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;s&quot;&gt;&amp;quot;BUX&amp;quot;&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;mi&quot;&gt;40&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;));&lt;/span&gt;
+    &lt;span class=&quot;n&quot;&gt;DataStream&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;StockPrice&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;SPX_stream&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;env&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;addSource&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;StockSource&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;s&quot;&gt;&amp;quot;SPX&amp;quot;&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;mi&quot;&gt;10&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;));&lt;/span&gt;
+    &lt;span class=&quot;n&quot;&gt;DataStream&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;StockPrice&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;FTSE_stream&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;env&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;addSource&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;StockSource&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;s&quot;&gt;&amp;quot;FTSE&amp;quot;&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;mi&quot;&gt;20&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;));&lt;/span&gt;
+    &lt;span class=&quot;n&quot;&gt;DataStream&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;StockPrice&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;DJI_stream&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;env&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;addSource&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;StockSource&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;s&quot;&gt;&amp;quot;DJI&amp;quot;&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;mi&quot;&gt;30&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;));&lt;/span&gt;
+    &lt;span class=&quot;n&quot;&gt;DataStream&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;StockPrice&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;BUX_stream&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;env&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;addSource&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;StockSource&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;s&quot;&gt;&amp;quot;BUX&amp;quot;&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;mi&quot;&gt;40&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;));&lt;/span&gt;
 
     &lt;span class=&quot;c1&quot;&gt;//Merge all stock streams 
together&lt;/span&gt;
     &lt;span class=&quot;n&quot;&gt;DataStream&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;StockPrice&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;stockStream&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;socketStockStream&lt;/span&gt;
@@ -1037,11 +1037,11 @@ of this example, the data streams are simply generated 
using the
     &lt;span class=&quot;nd&quot;&gt;@Override&lt;/span&gt;
     &lt;span class=&quot;kd&quot;&gt;public&lt;/span&gt; &lt;span 
class=&quot;kt&quot;&gt;void&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;invoke&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;Collector&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;StockPrice&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;collector&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)&lt;/span&gt; &lt;span 
class=&quot;kd&quot;&gt;throws&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;Exception&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;{&lt;/span&gt;
         &lt;span class=&quot;n&quot;&gt;price&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;DEFAULT_PRICE&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;;&lt;/span&gt;
-        &lt;span class=&quot;n&quot;&gt;Random&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;random&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;Random&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;();&lt;/span&gt;
+        &lt;span class=&quot;n&quot;&gt;Random&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;random&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;Random&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;();&lt;/span&gt;
 
         &lt;span class=&quot;k&quot;&gt;while&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;kc&quot;&gt;true&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;{&lt;/span&gt;
             &lt;span class=&quot;n&quot;&gt;price&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;price&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;+&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;random&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;nextGaussian&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;()&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;*&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;sigma&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;;&lt;/span&gt;
-            &lt;span class=&quot;n&quot;&gt;collector&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;collect&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;StockPrice&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;symbol&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;price&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;));&lt;/span&gt;
+            &lt;span class=&quot;n&quot;&gt;collector&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;collect&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;StockPrice&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;symbol&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;price&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;));&lt;/span&gt;
             &lt;span class=&quot;n&quot;&gt;Thread&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;sleep&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;random&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;nextInt&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;mi&quot;&gt;200&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;));&lt;/span&gt;
         &lt;span class=&quot;o&quot;&gt;}&lt;/span&gt;
     &lt;span class=&quot;o&quot;&gt;}&lt;/span&gt;
@@ -1128,7 +1128,7 @@ performed on named fields of POJOs, making the code more 
readable.&lt;/p&gt;
 &lt;span class=&quot;n&quot;&gt;DataStream&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;StockPrice&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;maxByStock&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;windowedStream&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;groupBy&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;s&quot;&gt;&amp;quot;symbol&amp;quot;&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)&lt;/span&gt;
     &lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;maxBy&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;s&quot;&gt;&amp;quot;price&amp;quot;&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;).&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;flatten&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;();&lt;/span&gt;
 &lt;span class=&quot;n&quot;&gt;DataStream&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;StockPrice&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;rollingMean&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;windowedStream&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;groupBy&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;s&quot;&gt;&amp;quot;symbol&amp;quot;&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)&lt;/span&gt;
-    &lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;mapWindow&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;WindowMean&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;()).&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;flatten&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;();&lt;/span&gt;
+    &lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;mapWindow&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;WindowMean&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;()).&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;flatten&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;();&lt;/span&gt;
 
 &lt;span class=&quot;c1&quot;&gt;//Compute the mean of a window&lt;/span&gt;
 &lt;span class=&quot;kd&quot;&gt;public&lt;/span&gt; &lt;span 
class=&quot;kd&quot;&gt;final&lt;/span&gt; &lt;span 
class=&quot;kd&quot;&gt;static&lt;/span&gt; &lt;span 
class=&quot;kd&quot;&gt;class&lt;/span&gt; &lt;span 
class=&quot;nc&quot;&gt;WindowMean&lt;/span&gt; &lt;span 
class=&quot;kd&quot;&gt;implements&lt;/span&gt; 
@@ -1143,12 +1143,12 @@ performed on named fields of POJOs, making the code 
more readable.&lt;/p&gt;
         &lt;span class=&quot;kd&quot;&gt;throws&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;Exception&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;{&lt;/span&gt;
 
         &lt;span class=&quot;k&quot;&gt;if&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;values&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;iterator&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;().&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;hasNext&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;())&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;{&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;s&lt;/span&gt;
-            &lt;span class=&quot;nf&quot;&gt;for&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;StockPrice&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;sp&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;:&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;values&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;{&lt;/span&gt;
+            &lt;span class=&quot;k&quot;&gt;for&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;StockPrice&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;sp&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;:&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;values&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;{&lt;/span&gt;
                 &lt;span class=&quot;n&quot;&gt;sum&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;+=&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;sp&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;price&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;;&lt;/span&gt;
                 &lt;span class=&quot;n&quot;&gt;symbol&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;sp&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;symbol&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;;&lt;/span&gt;
                 &lt;span class=&quot;n&quot;&gt;count&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;++;&lt;/span&gt;
             &lt;span class=&quot;o&quot;&gt;}&lt;/span&gt;
-            &lt;span class=&quot;n&quot;&gt;out&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;collect&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;StockPrice&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;symbol&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;sum&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;/&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;count&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;));&lt;/span&gt;
+            &lt;span class=&quot;n&quot;&gt;out&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;collect&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;StockPrice&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;symbol&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;sum&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;/&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;count&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;));&lt;/span&gt;
         &lt;span class=&quot;o&quot;&gt;}&lt;/span&gt;
     &lt;span class=&quot;o&quot;&gt;}&lt;/span&gt;
 &lt;span 
class=&quot;o&quot;&gt;}&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;
@@ -1208,7 +1208,7 @@ every 30 seconds.&lt;/p&gt;
   &lt;div data-lang=&quot;java7&quot;&gt;
 
     &lt;div class=&quot;highlight&quot;&gt;&lt;pre&gt;&lt;code 
class=&quot;language-java&quot; data-lang=&quot;java&quot;&gt;&lt;span 
class=&quot;kd&quot;&gt;private&lt;/span&gt; &lt;span 
class=&quot;kd&quot;&gt;static&lt;/span&gt; &lt;span 
class=&quot;kd&quot;&gt;final&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;Double&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;DEFAULT_PRICE&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;mi&quot;&gt;1000&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.;&lt;/span&gt;
-&lt;span class=&quot;kd&quot;&gt;private&lt;/span&gt; &lt;span 
class=&quot;kd&quot;&gt;static&lt;/span&gt; &lt;span 
class=&quot;kd&quot;&gt;final&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;StockPrice&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;DEFAULT_STOCK_PRICE&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;StockPrice&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;s&quot;&gt;&amp;quot;&amp;quot;&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;DEFAULT_PRICE&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;);&lt;/span&gt;
+&lt;span class=&quot;kd&quot;&gt;private&lt;/span&gt; &lt;span 
class=&quot;kd&quot;&gt;static&lt;/span&gt; &lt;span 
class=&quot;kd&quot;&gt;final&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;StockPrice&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;DEFAULT_STOCK_PRICE&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;StockPrice&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;s&quot;&gt;&amp;quot;&amp;quot;&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;DEFAULT_PRICE&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;);&lt;/span&gt;
 
 &lt;span class=&quot;c1&quot;&gt;//Use delta policy to create price change 
warnings&lt;/span&gt;
 &lt;span class=&quot;n&quot;&gt;DataStream&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;String&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;priceWarnings&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;stockStream&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;groupBy&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;s&quot;&gt;&amp;quot;symbol&amp;quot;&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)&lt;/span&gt;
@@ -1218,7 +1218,7 @@ every 30 seconds.&lt;/p&gt;
             &lt;span class=&quot;k&quot;&gt;return&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;Math&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;abs&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;oldDataPoint&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;price&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;-&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;newDataPoint&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;price&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;);&lt;/span&gt;
         &lt;span class=&quot;o&quot;&gt;}&lt;/span&gt;
     &lt;span class=&quot;o&quot;&gt;},&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;DEFAULT_STOCK_PRICE&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;))&lt;/span&gt;
-&lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;mapWindow&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;SendWarning&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;()).&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;flatten&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;();&lt;/span&gt;
+&lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;mapWindow&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;SendWarning&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;()).&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;flatten&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;();&lt;/span&gt;
 
 &lt;span class=&quot;c1&quot;&gt;//Count the number of warnings every half a 
minute&lt;/span&gt;
 &lt;span class=&quot;n&quot;&gt;DataStream&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;Count&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;warningsPerStock&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;priceWarnings&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;map&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;MapFunction&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;String&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;Count&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;()&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;{&lt;/span&gt;
@@ -1306,7 +1306,7 @@ but for the sake of this example we generate dummy tweet 
data.&lt;/p&gt;
   &lt;div data-lang=&quot;java7&quot;&gt;
 
     &lt;div class=&quot;highlight&quot;&gt;&lt;pre&gt;&lt;code 
class=&quot;language-java&quot; data-lang=&quot;java&quot;&gt;&lt;span 
class=&quot;c1&quot;&gt;//Read a stream of tweets&lt;/span&gt;
-&lt;span class=&quot;n&quot;&gt;DataStream&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;String&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;tweetStream&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;env&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;addSource&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;TweetSource&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;());&lt;/span&gt;
+&lt;span class=&quot;n&quot;&gt;DataStream&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;String&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;tweetStream&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;env&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;addSource&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;TweetSource&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;());&lt;/span&gt;
 
 &lt;span class=&quot;c1&quot;&gt;//Extract the stock symbols&lt;/span&gt;
 &lt;span class=&quot;n&quot;&gt;DataStream&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;String&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;mentionedSymbols&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;tweetStream&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;flatMap&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;
@@ -1339,8 +1339,8 @@ but for the sake of this example we generate dummy tweet 
data.&lt;/p&gt;
 
     &lt;span class=&quot;nd&quot;&gt;@Override&lt;/span&gt;
     &lt;span class=&quot;kd&quot;&gt;public&lt;/span&gt; &lt;span 
class=&quot;kt&quot;&gt;void&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;invoke&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;Collector&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;String&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;collector&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)&lt;/span&gt; &lt;span 
class=&quot;kd&quot;&gt;throws&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;Exception&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;{&lt;/span&gt;
-        &lt;span class=&quot;n&quot;&gt;random&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;Random&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;();&lt;/span&gt;
-        &lt;span class=&quot;n&quot;&gt;stringBuilder&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;StringBuilder&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;();&lt;/span&gt;
+        &lt;span class=&quot;n&quot;&gt;random&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;Random&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;();&lt;/span&gt;
+        &lt;span class=&quot;n&quot;&gt;stringBuilder&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;StringBuilder&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;();&lt;/span&gt;
 
         &lt;span class=&quot;k&quot;&gt;while&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;kc&quot;&gt;true&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;{&lt;/span&gt;
             &lt;span class=&quot;n&quot;&gt;stringBuilder&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;setLength&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;mi&quot;&gt;0&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;);&lt;/span&gt;
@@ -1369,7 +1369,7 @@ number of mentions of a given stock in the Twitter 
stream. As both of
 these data streams are potentially infinite, we apply the join on a
 30-second window.&lt;/p&gt;
 
-&lt;p&gt;&lt;img alt=&quot;Streaming joins&quot; 
src=&quot;/img/blog/blog_stream_join.png&quot; width=&quot;60%&quot; 
class=&quot;img-responsive center-block&quot; /&gt;&lt;/p&gt;
+&lt;p&gt;&lt;img alt=&quot;Streaming joins&quot; 
src=&quot;/img/blog/blog_stream_join.png&quot; width=&quot;60%&quot; 
class=&quot;img-responsive center-block&quot; /&gt; &lt;/p&gt;
 
 &lt;div class=&quot;codetabs&quot;&gt;
 
@@ -1414,7 +1414,7 @@ these data streams are potentially infinite, we apply the 
join on a
     &lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;equalTo&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;s&quot;&gt;&amp;quot;symbol&amp;quot;&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)&lt;/span&gt;
     &lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;with&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;JoinFunction&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;Count&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;Count&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;Tuple2&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;Integer&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;Integer&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;&amp;gt;()&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;{&lt;/span&gt;
         &lt;span class=&quot;nd&quot;&gt;@Override&lt;/span&gt;
-        &lt;span class=&quot;kd&quot;&gt;public&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;Tuple2&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;Integer&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;Integer&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;join&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;Count&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;first&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;Count&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;second&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)&lt;/span&gt; &lt;span 
class=&quot;kd&quot;&gt;throws&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;Exception&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;{&lt;/span&gt;
+        &lt;span class=&quot;kd&quot;&gt;public&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;Tuple2&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;Integer&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;Integer&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;join&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;Count&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;first&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;Count&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;second&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)&lt;/span&gt; &lt;span 
class=&quot;kd&quot;&gt;throws&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;Exception&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;{&lt;/span&gt;
             &lt;span class=&quot;k&quot;&gt;return&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;Tuple2&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;Integer&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;Integer&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;first&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;count&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;second&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;count&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;);&lt;/span&gt;
             &lt;span class=&quot;o&quot;&gt;}&lt;/span&gt;
     &lt;span class=&quot;o&quot;&gt;});&lt;/span&gt;
@@ -1422,7 +1422,7 @@ these data streams are potentially infinite, we apply the 
join on a
 &lt;span class=&quot;c1&quot;&gt;//Compute rolling correlation&lt;/span&gt;
 &lt;span class=&quot;n&quot;&gt;DataStream&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;Double&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;rollingCorrelation&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;tweetsAndWarning&lt;/span&gt;
     &lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;window&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;Time&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;of&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;mi&quot;&gt;30&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;TimeUnit&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;SECONDS&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;))&lt;/span&gt;
-    &lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;mapWindow&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;WindowCorrelation&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;());&lt;/span&gt;
+    &lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;mapWindow&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;WindowCorrelation&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;());&lt;/span&gt;
 
 &lt;span class=&quot;n&quot;&gt;rollingCorrelation&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;print&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;();&lt;/span&gt;
 
@@ -1538,7 +1538,7 @@ internally, fault tolerance, and performance 
measurements!&lt;/p&gt;
 
 &lt;h3 
id=&quot;using-off-heap-memoryhttpsgithubcomapacheflinkpull290&quot;&gt;&lt;a 
href=&quot;https://github.com/apache/flink/pull/290&quot;&gt;Using off-heap 
memory&lt;/a&gt;&lt;/h3&gt;
 
-&lt;p&gt;This pull request enables Flink to use off-heap memory for its 
internal memory uses (sort, hash, caching of intermediate data sets).&lt;/p&gt;
+&lt;p&gt;This pull request enables Flink to use off-heap memory for its 
internal memory uses (sort, hash, caching of intermediate data sets). &lt;/p&gt;
 
 &lt;h3 
id=&quot;gelly-flinks-graph-apihttpsgithubcomapacheflinkpull335&quot;&gt;&lt;a 
href=&quot;https://github.com/apache/flink/pull/335&quot;&gt;Gelly, Flink’s 
Graph API&lt;/a&gt;&lt;/h3&gt;
 
@@ -1610,7 +1610,7 @@ internally, fault tolerance, and performance 
measurements!&lt;/p&gt;
   &lt;li&gt;Stefan Bunk&lt;/li&gt;
   &lt;li&gt;Paris Carbone&lt;/li&gt;
   &lt;li&gt;Ufuk Celebi&lt;/li&gt;
-  &lt;li&gt;Nils Engelbach&lt;/li&gt;
+  &lt;li&gt;Nils Engelbach &lt;/li&gt;
   &lt;li&gt;Stephan Ewen&lt;/li&gt;
   &lt;li&gt;Gyula Fora&lt;/li&gt;
   &lt;li&gt;Gabor Hermann&lt;/li&gt;
@@ -1715,7 +1715,7 @@ Flink serialization system improved a lot over time and 
by now surpasses the cap
 &lt;img src=&quot;/img/blog/hcompat-logos.png&quot; 
style=&quot;width:30%;margin:15px&quot; /&gt;
 &lt;/center&gt;
 
-&lt;p&gt;To close this gap, Flink provides a Hadoop Compatibility package to 
wrap functions implemented against Hadoop’s MapReduce interfaces and embed 
them in Flink programs. This package was developed as part of a &lt;a 
href=&quot;https://developers.google.com/open-source/soc/&quot;&gt;Google 
Summer of Code&lt;/a&gt; 2014 project.&lt;/p&gt;
+&lt;p&gt;To close this gap, Flink provides a Hadoop Compatibility package to 
wrap functions implemented against Hadoop’s MapReduce interfaces and embed 
them in Flink programs. This package was developed as part of a &lt;a 
href=&quot;https://developers.google.com/open-source/soc/&quot;&gt;Google 
Summer of Code&lt;/a&gt; 2014 project. &lt;/p&gt;
 
 &lt;p&gt;With the Hadoop Compatibility package, you can reuse all your 
Hadoop&lt;/p&gt;
 
@@ -1728,7 +1728,7 @@ Flink serialization system improved a lot over time and 
by now surpasses the cap
 
 &lt;p&gt;in Flink programs without changing a line of code. Moreover, Flink 
also natively supports all Hadoop data types 
(&lt;code&gt;Writables&lt;/code&gt; and 
&lt;code&gt;WritableComparable&lt;/code&gt;).&lt;/p&gt;
 
-&lt;p&gt;The following code snippet shows a simple Flink WordCount program 
that solely uses Hadoop data types, InputFormat, OutputFormat, Mapper, and 
Reducer functions.&lt;/p&gt;
+&lt;p&gt;The following code snippet shows a simple Flink WordCount program 
that solely uses Hadoop data types, InputFormat, OutputFormat, Mapper, and 
Reducer functions. &lt;/p&gt;
 
 &lt;div class=&quot;highlight&quot;&gt;&lt;pre&gt;&lt;code 
class=&quot;language-java&quot;&gt;&lt;span class=&quot;c1&quot;&gt;// 
Definition of Hadoop Mapper function&lt;/span&gt;
 &lt;span class=&quot;kd&quot;&gt;public&lt;/span&gt; &lt;span 
class=&quot;kd&quot;&gt;class&lt;/span&gt; &lt;span 
class=&quot;nc&quot;&gt;Tokenizer&lt;/span&gt; &lt;span 
class=&quot;kd&quot;&gt;implements&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;Mapper&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;LongWritable&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;Text&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;Text&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;LongWritable&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;{&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;...&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;}&lt;/span&gt;
@@ -1744,25 +1744,25 @@ Flink serialization system improved a lot over time and 
by now surpasses the cap
   &lt;span class=&quot;c1&quot;&gt;// Setup Hadoop’s 
TextInputFormat&lt;/span&gt;
   &lt;span class=&quot;n&quot;&gt;HadoopInputFormat&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;LongWritable&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;Text&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;hadoopInputFormat&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; 
       &lt;span class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;HadoopInputFormat&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;LongWritable&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;Text&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;(&lt;/span&gt;
-        &lt;span class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;TextInputFormat&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(),&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;LongWritable&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;class&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;Text&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;class&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;JobConf&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;());&lt;/span&gt;
-  &lt;span class=&quot;n&quot;&gt;TextInputFormat&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;addInputPath&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;hadoopInputFormat&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;getJobConf&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(),&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;Path&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;inputPath&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;));&lt;/span&gt;
+        &lt;span class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;TextInputFormat&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(),&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;LongWritable&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;class&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;Text&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;class&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;JobConf&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;());&lt;/span&gt;
+  &lt;span class=&quot;n&quot;&gt;TextInputFormat&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;addInputPath&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;hadoopInputFormat&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;getJobConf&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(),&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;Path&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;inputPath&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;));&lt;/span&gt;
   
   &lt;span class=&quot;c1&quot;&gt;// Read a DataSet with the Hadoop 
InputFormat&lt;/span&gt;
   &lt;span class=&quot;n&quot;&gt;DataSet&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;Tuple2&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;LongWritable&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;Text&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;&amp;gt;&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;text&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;env&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;createInput&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;hadoopInputFormat&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;);&lt;/span&gt;
   &lt;span class=&quot;n&quot;&gt;DataSet&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;Tuple2&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;Text&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;LongWritable&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;&amp;gt;&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;words&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;text&lt;/span&gt;
     &lt;span class=&quot;c1&quot;&gt;// Wrap Tokenizer Mapper 
function&lt;/span&gt;
-    &lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;flatMap&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;HadoopMapFunction&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;LongWritable&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;Text&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;Text&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;LongWritable&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;Tokenizer&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;()))&lt;/span&gt;
+    &lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;flatMap&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;HadoopMapFunction&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;LongWritable&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;Text&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;Text&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;LongWritable&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;Tokenizer&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;()))&lt;/span&gt;
     &lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;groupBy&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;mi&quot;&gt;0&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)&lt;/span&gt;
     &lt;span class=&quot;c1&quot;&gt;// Wrap Counter Reducer function (used as 
Reducer and Combiner)&lt;/span&gt;
     &lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;reduceGroup&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;HadoopReduceCombineFunction&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;Text&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;LongWritable&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;Text&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;LongWritable&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;(&lt;/span&gt;
-      &lt;span class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;Counter&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(),&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;Counter&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;()));&lt;/span&gt;
+      &lt;span class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;Counter&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(),&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;Counter&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;()));&lt;/span&gt;
         
   &lt;span class=&quot;c1&quot;&gt;// Setup Hadoop’s 
TextOutputFormat&lt;/span&gt;
   &lt;span class=&quot;n&quot;&gt;HadoopOutputFormat&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;Text&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;LongWritable&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;hadoopOutputFormat&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; 
     &lt;span class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;HadoopOutputFormat&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;Text&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;LongWritable&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;(&lt;/span&gt;
-      &lt;span class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;TextOutputFormat&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;Text&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;LongWritable&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;(),&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;JobConf&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;());&lt;/span&gt;
+      &lt;span class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;TextOutputFormat&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;Text&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;LongWritable&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;(),&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;JobConf&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;());&lt;/span&gt;
   &lt;span class=&quot;n&quot;&gt;hadoopOutputFormat&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;getJobConf&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;().&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;set&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;s&quot;&gt;&amp;quot;mapred.textoutputformat.separator&amp;quot;&lt;/span&gt;&lt;span
 class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;s&quot;&gt;&amp;quot; &amp;quot;&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;);&lt;/span&gt;
-  &lt;span class=&quot;n&quot;&gt;TextOutputFormat&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;setOutputPath&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;hadoopOutputFormat&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;getJobConf&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(),&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;Path&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;outputPath&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;));&lt;/span&gt;
+  &lt;span class=&quot;n&quot;&gt;TextOutputFormat&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;setOutputPath&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;hadoopOutputFormat&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;getJobConf&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(),&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;Path&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;outputPath&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;));&lt;/span&gt;
         
   &lt;span class=&quot;c1&quot;&gt;// Output &amp;amp; Execute&lt;/span&gt;
   &lt;span class=&quot;n&quot;&gt;words&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;output&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;hadoopOutputFormat&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;);&lt;/span&gt;
@@ -1814,7 +1814,7 @@ Flink serialization system improved a lot over time and 
by now surpasses the cap
 
 &lt;p&gt;&lt;strong&gt;Record API deprecated:&lt;/strong&gt; The (old) 
Stratosphere Record API has been marked as deprecated and is planned for 
removal in the 0.9.0 release.&lt;/p&gt;
 
-&lt;p&gt;&lt;strong&gt;BLOB service:&lt;/strong&gt; This release contains a 
new service to distribute jar files and other binary data among the JobManager, 
TaskManagers and the client.&lt;/p&gt;
+&lt;p&gt;&lt;strong&gt;BLOB service:&lt;/strong&gt; This release contains a 
new service to distribute jar files and other binary data among the JobManager, 
TaskManagers and the client. &lt;/p&gt;
 
 &lt;p&gt;&lt;strong&gt;Intermediate data sets:&lt;/strong&gt; A major rewrite 
of the system internals introduces intermediate data sets as first class 
citizens. The internal state machine that tracks the distributed tasks has also 
been completely rewritten for scalability. While this is not visible as a 
user-facing feature yet, it is the foundation for several upcoming exciting 
features.&lt;/p&gt;
 
@@ -2250,7 +2250,7 @@ Applying students can use our wiki (create a new page) to 
create a project propo
 ssh [email protected] -i 
~/Downloads/work-laptop.pem&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;
 
 &lt;p&gt;(Windows users have to follow &lt;a 
href=&quot;http://docs.aws.amazon.com/ElasticMapReduce/latest/DeveloperGuide/emr-connect-master-node-ssh.html&quot;&gt;these
 instructions&lt;/a&gt; to SSH into the machine running the master.) 
&amp;lt;/br&amp;gt;&amp;lt;/br&amp;gt;
-Once connected to the master, download and start Stratosphere for 
YARN:&lt;/p&gt;
+Once connected to the master, download and start Stratosphere for YARN: 
&lt;/p&gt;
 &lt;ul&gt;
        &lt;li&gt;Download and extract Stratosphere-YARN&lt;/li&gt;
 
@@ -2262,7 +2262,7 @@ tar xvzf 
stratosphere-dist-0.5-SNAPSHOT-yarn.tar.gz&lt;/code&gt;&lt;/pre&gt;&lt;
 
 
 &lt;div class=&quot;highlight&quot;&gt;&lt;

<TRUNCATED>

Reply via email to