http://git-wip-us.apache.org/repos/asf/flink-web/blob/d8883b04/content/news/2015/02/09/streaming-example.html ---------------------------------------------------------------------- diff --git a/content/news/2015/02/09/streaming-example.html b/content/news/2015/02/09/streaming-example.html deleted file mode 100644 index 198d0d7..0000000 --- a/content/news/2015/02/09/streaming-example.html +++ /dev/null @@ -1,846 +0,0 @@ -<!DOCTYPE html> -<html lang="en"> - <head> - <meta charset="utf-8"> - <meta http-equiv="X-UA-Compatible" content="IE=edge"> - <meta name="viewport" content="width=device-width, initial-scale=1"> - <!-- The above 3 meta tags *must* come first in the head; any other head content must come *after* these tags --> - <title>Apache Flink: Introducing Flink Streaming</title> - <link rel="shortcut icon" href="/favicon.ico" type="image/x-icon"> - <link rel="icon" href="/favicon.ico" type="image/x-icon"> - - <!-- Bootstrap --> - <link rel="stylesheet" href="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.4/css/bootstrap.min.css"> - <link rel="stylesheet" href="/css/flink.css"> - <link rel="stylesheet" href="/css/syntax.css"> - - <!-- Blog RSS feed --> - <link href="/blog/feed.xml" rel="alternate" type="application/rss+xml" title="Apache Flink Blog: RSS feed" /> - - <!-- jQuery (necessary for Bootstrap's JavaScript plugins) --> - <!-- We need to load Jquery in the header for custom google analytics event tracking--> - <script src="https://ajax.googleapis.com/ajax/libs/jquery/1.11.2/jquery.min.js"></script> - - <!-- HTML5 shim and Respond.js for IE8 support of HTML5 elements and media queries --> - <!-- WARNING: Respond.js doesn't work if you view the page via file:// --> - <!--[if lt IE 9]> - <script src="https://oss.maxcdn.com/html5shiv/3.7.2/html5shiv.min.js"></script> - <script src="https://oss.maxcdn.com/respond/1.4.2/respond.min.js"></script> - <![endif]--> - </head> - <body> - - - <!-- Top navbar. --> - <nav class="navbar navbar-default navbar-fixed-top"> - <div class="container"> - <!-- The logo. --> - <div class="navbar-header"> - <button type="button" class="navbar-toggle collapsed" data-toggle="collapse" data-target="#bs-example-navbar-collapse-1"> - <span class="icon-bar"></span> - <span class="icon-bar"></span> - <span class="icon-bar"></span> - </button> - <div class="navbar-logo"> - <a href="/"> - <img alt="Apache Flink" src="/img/navbar-brand-logo.jpg" width="78px" height="40px"> - </a> - </div> - </div><!-- /.navbar-header --> - - <!-- The navigation links. --> - <div class="collapse navbar-collapse" id="bs-example-navbar-collapse-1"> - <ul class="nav navbar-nav"> - - <!-- Overview --> - <li><a href="/index.html">Overview</a></li> - - <!-- Features --> - <li><a href="/features.html">Features</a></li> - - <!-- Downloads --> - <li><a href="/downloads.html">Downloads</a></li> - - <!-- FAQ --> - <li><a href="/faq.html">FAQ</a></li> - - - <!-- Quickstart --> - <li class="dropdown"> - <a href="#" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false"><small><span class="glyphicon glyphicon-new-window"></span></small> Quickstart <span class="caret"></span></a> - <ul class="dropdown-menu" role="menu"> - <li><a href="http://ci.apache.org/projects/flink/flink-docs-release-1.1/quickstart/setup_quickstart.html">Setup</a></li> - <li><a href="http://ci.apache.org/projects/flink/flink-docs-release-1.1/quickstart/run_example_quickstart.html">Example: Wikipedia Edit Stream</a></li> - <li><a href="http://ci.apache.org/projects/flink/flink-docs-release-1.1/quickstart/java_api_quickstart.html">Java API</a></li> - <li><a href="http://ci.apache.org/projects/flink/flink-docs-release-1.1/quickstart/scala_api_quickstart.html">Scala API</a></li> - </ul> - </li> - - <!-- Documentation --> - <li class="dropdown"> - <a href="" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false"><small><span class="glyphicon glyphicon-new-window"></span></small> Documentation <span class="caret"></span></a> - <ul class="dropdown-menu" role="menu"> - <!-- Latest stable release --> - <li role="presentation" class="dropdown-header"><strong>Latest Release</strong> (Stable)</li> - <li><a href="http://ci.apache.org/projects/flink/flink-docs-release-1.1">1.1 Documentation</a></li> - <li><a href="http://ci.apache.org/projects/flink/flink-docs-release-1.1/api/java" class="active">1.1 Javadocs</a></li> - <!--<li><a href="http://ci.apache.org/projects/flink/flink-docs-release-1.1/api/scala/index.html" class="active">1.1 ScalaDocs</a></li> --> - - <!-- Snapshot docs --> - <li class="divider"></li> - <li role="presentation" class="dropdown-header"><strong>Snapshot</strong> (Development)</li> - <li><a href="http://ci.apache.org/projects/flink/flink-docs-release-1.2">1.2 Documentation</a></li> - <li><a href="http://ci.apache.org/projects/flink/flink-docs-release-1.2/api/java" class="active">1.2 Javadocs</a></li> - <!--<li><a href="http://ci.apache.org/projects/flink/flink-docs-release-1.2/api/scala/index.html" class="active">1.2 ScalaDocs</a></li> --> - - <!-- Wiki --> - <li class="divider"></li> - <li><a href="/visualizer/"><small><span class="glyphicon glyphicon-new-window"></span></small> Plan Visualizer</a></li> - <li><a href="https://cwiki.apache.org/confluence/display/FLINK/Apache+Flink+Home"><small><span class="glyphicon glyphicon-new-window"></span></small> Wiki</a></li> - </ul> - </li> - - </ul> - - <ul class="nav navbar-nav navbar-right"> - <!-- Blog --> - <li class=" active hidden-md hidden-sm"><a href="/blog/">Blog</a></li> - - <li class="dropdown hidden-md hidden-sm"> - <a href="#" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Community <span class="caret"></span></a> - <ul class="dropdown-menu" role="menu"> - <!-- Community --> - <li role="presentation" class="dropdown-header"><strong>Community</strong></li> - <li><a href="/community.html#mailing-lists">Mailing Lists</a></li> - <li><a href="/community.html#irc">IRC</a></li> - <li><a href="/community.html#stack-overflow">Stack Overflow</a></li> - <li><a href="/community.html#issue-tracker">Issue Tracker</a></li> - <li><a href="/community.html#third-party-packages">Third Party Packages</a></li> - <li><a href="/community.html#source-code">Source Code</a></li> - <li><a href="/community.html#people">People</a></li> - <li><a href="/poweredby.html">Powered by Flink</a></li> - - <!-- Contribute --> - <li class="divider"></li> - <li role="presentation" class="dropdown-header"><strong>Contribute</strong></li> - <li><a href="/how-to-contribute.html">How to Contribute</a></li> - <li><a href="/contribute-code.html">Contribute Code</a></li> - <li><a href="/contribute-documentation.html">Contribute Documentation</a></li> - <li><a href="/improve-website.html">Improve the Website</a></li> - <li><a href="https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals"><small><span class="glyphicon glyphicon-new-window"></span></small> Flink Improvement Proposals (Design Docs)</a></li> - </ul> - </li> - - <li class="dropdown hidden-md hidden-sm"> - <a href="#" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Project <span class="caret"></span></a> - <ul class="dropdown-menu" role="menu"> - <!-- Project --> - <li role="presentation" class="dropdown-header"><strong>Project</strong></li> - <li><a href="/slides.html">Slides</a></li> - <li><a href="/material.html">Material</a></li> - <li><a href="https://twitter.com/apacheflink"><small><span class="glyphicon glyphicon-new-window"></span></small> Twitter</a></li> - <li><a href="https://github.com/apache/flink"><small><span class="glyphicon glyphicon-new-window"></span></small> GitHub</a></li> - <li><a href="https://cwiki.apache.org/confluence/display/FLINK/Apache+Flink+Home"><small><span class="glyphicon glyphicon-new-window"></span></small> Wiki</a></li> - </ul> - </li> - </ul> - </div><!-- /.navbar-collapse --> - </div><!-- /.container --> - </nav> - - - <!-- Main content. --> - <div class="container"> - - -<div class="row"> - <div class="col-sm-8 col-sm-offset-2"> - <div class="row"> - <h1>Introducing Flink Streaming</h1> - - <article> - <p>09 Feb 2015</p> - -<p>This post is the first of a series of blog posts on Flink Streaming, -the recent addition to Apache Flink that makes it possible to analyze -continuous data sources in addition to static files. Flink Streaming -uses the pipelined Flink engine to process data streams in real time -and offers a new API including definition of flexible windows.</p> - -<p>In this post, we go through an example that uses the Flink Streaming -API to compute statistics on stock market data that arrive -continuously and combine the stock market data with Twitter streams. -See the <a href="http://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/index.html">Streaming Programming -Guide</a> for a -detailed presentation of the Streaming API.</p> - -<p>First, we read a bunch of stock price streams and combine them into -one stream of market data. We apply several transformations on this -market data stream, like rolling aggregations per stock. Then we emit -price warning alerts when the prices are rapidly changing. Moving -towards more advanced features, we compute rolling correlations -between the market data streams and a Twitter stream with stock mentions.</p> - -<p>For running the example implementation please use the <em>0.9-SNAPSHOT</em> -version of Flink as a dependency. The full example code base can be -found <a href="https://github.com/mbalassi/flink/blob/stockprices/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala">here</a> in Scala and <a href="https://github.com/mbalassi/flink/blob/stockprices/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/StockPrices.java">here</a> in Java7.</p> - -<p><a href="#top"></a></p> - -<p><a href="#top">Back to top</a></p> - -<h2 id="reading-from-multiple-inputs">Reading from multiple inputs</h2> - -<p>First, let us create the stream of stock prices:</p> - -<ol> - <li>Read a socket stream of stock prices</li> - <li>Parse the text in the stream to create a stream of <code>StockPrice</code> objects</li> - <li>Add four other sources tagged with the stock symbol.</li> - <li>Finally, merge the streams to create a unified stream.</li> -</ol> - -<p><img alt="Reading from multiple inputs" src="/img/blog/blog_multi_input.png" width="70%" class="img-responsive center-block" /></p> - -<div class="codetabs"> - <div data-lang="scala"> - - <div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">def</span> <span class="n">main</span><span class="o">(</span><span class="n">args</span><span class="k">:</span> <span class="kt">Array</span><span class="o">[</span><span class="kt">String</span><span class="o">])</span> <span class="o">{</span> - - <span class="k">val</span> <span class="n">env</span> <span class="k">=</span> <span class="nc">StreamExecutionEnvironment</span><span class="o">.</span><span class="n">getExecutionEnvironment</span> - - <span class="c1">//Read from a socket stream at map it to StockPrice objects</span> - <span class="k">val</span> <span class="n">socketStockStream</span> <span class="k">=</span> <span class="n">env</span><span class="o">.</span><span class="n">socketTextStream</span><span class="o">(</span><span class="s">"localhost"</span><span class="o">,</span> <span class="mi">9999</span><span class="o">).</span><span class="n">map</span><span class="o">(</span><span class="n">x</span> <span class="k">=></span> <span class="o">{</span> - <span class="k">val</span> <span class="n">split</span> <span class="k">=</span> <span class="n">x</span><span class="o">.</span><span class="n">split</span><span class="o">(</span><span class="s">","</span><span class="o">)</span> - <span class="nc">StockPrice</span><span class="o">(</span><span class="n">split</span><span class="o">(</span><span class="mi">0</span><span class="o">),</span> <span class="n">split</span><span class="o">(</span><span class="mi">1</span><span class="o">).</span><span class="n">toDouble</span><span class="o">)</span> - <span class="o">})</span> - - <span class="c1">//Generate other stock streams</span> - <span class="k">val</span> <span class="nc">SPX_Stream</span> <span class="k">=</span> <span class="n">env</span><span class="o">.</span><span class="n">addSource</span><span class="o">(</span><span class="n">generateStock</span><span class="o">(</span><span class="s">"SPX"</span><span class="o">)(</span><span class="mi">10</span><span class="o">)</span> <span class="k">_</span><span class="o">)</span> - <span class="k">val</span> <span class="nc">FTSE_Stream</span> <span class="k">=</span> <span class="n">env</span><span class="o">.</span><span class="n">addSource</span><span class="o">(</span><span class="n">generateStock</span><span class="o">(</span><span class="s">"FTSE"</span><span class="o">)(</span><span class="mi">20</span><span class="o">)</span> <span class="k">_</span><span class="o">)</span> - <span class="k">val</span> <span class="nc">DJI_Stream</span> <span class="k">=</span> <span class="n">env</span><span class="o">.</span><span class="n">addSource</span><span class="o">(</span><span class="n">generateStock</span><span class="o">(</span><span class="s">"DJI"</span><span class="o">)(</span><span class="mi">30</span><span class="o">)</span> <span class="k">_</span><span class="o">)</span> - <span class="k">val</span> <span class="nc">BUX_Stream</span> <span class="k">=</span> <span class="n">env</span><span class="o">.</span><span class="n">addSource</span><span class="o">(</span><span class="n">generateStock</span><span class="o">(</span><span class="s">"BUX"</span><span class="o">)(</span><span class="mi">40</span><span class="o">)</span> <span class="k">_</span><span class="o">)</span> - - <span class="c1">//Merge all stock streams together</span> - <span class="k">val</span> <span class="n">stockStream</span> <span class="k">=</span> <span class="n">socketStockStream</span><span class="o">.</span><span class="n">merge</span><span class="o">(</span><span class="nc">SPX_Stream</span><span class="o">,</span> <span class="nc">FTSE_Stream</span><span class="o">,</span> - <span class="nc">DJI_Stream</span><span class="o">,</span> <span class="nc">BUX_Stream</span><span class="o">)</span> - - <span class="n">stockStream</span><span class="o">.</span><span class="n">print</span><span class="o">()</span> - - <span class="n">env</span><span class="o">.</span><span class="n">execute</span><span class="o">(</span><span class="s">"Stock stream"</span><span class="o">)</span> -<span class="o">}</span></code></pre></div> - - </div> - <div data-lang="java7"> - - <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">static</span> <span class="kt">void</span> <span class="nf">main</span><span class="o">(</span><span class="n">String</span><span class="o">[]</span> <span class="n">args</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span> - - <span class="kd">final</span> <span class="n">StreamExecutionEnvironment</span> <span class="n">env</span> <span class="o">=</span> - <span class="n">StreamExecutionEnvironment</span><span class="o">.</span><span class="na">getExecutionEnvironment</span><span class="o">();</span> - - <span class="c1">//Read from a socket stream at map it to StockPrice objects</span> - <span class="n">DataStream</span><span class="o"><</span><span class="n">StockPrice</span><span class="o">></span> <span class="n">socketStockStream</span> <span class="o">=</span> <span class="n">env</span> - <span class="o">.</span><span class="na">socketTextStream</span><span class="o">(</span><span class="s">"localhost"</span><span class="o">,</span> <span class="mi">9999</span><span class="o">)</span> - <span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="k">new</span> <span class="n">MapFunction</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">StockPrice</span><span class="o">>()</span> <span class="o">{</span> - <span class="kd">private</span> <span class="n">String</span><span class="o">[]</span> <span class="n">tokens</span><span class="o">;</span> - - <span class="nd">@Override</span> - <span class="kd">public</span> <span class="n">StockPrice</span> <span class="nf">map</span><span class="o">(</span><span class="n">String</span> <span class="n">value</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span> - <span class="n">tokens</span> <span class="o">=</span> <span class="n">value</span><span class="o">.</span><span class="na">split</span><span class="o">(</span><span class="s">","</span><span class="o">);</span> - <span class="k">return</span> <span class="k">new</span> <span class="nf">StockPrice</span><span class="o">(</span><span class="n">tokens</span><span class="o">[</span><span class="mi">0</span><span class="o">],</span> - <span class="n">Double</span><span class="o">.</span><span class="na">parseDouble</span><span class="o">(</span><span class="n">tokens</span><span class="o">[</span><span class="mi">1</span><span class="o">]));</span> - <span class="o">}</span> - <span class="o">});</span> - - <span class="c1">//Generate other stock streams</span> - <span class="n">DataStream</span><span class="o"><</span><span class="n">StockPrice</span><span class="o">></span> <span class="n">SPX_stream</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="na">addSource</span><span class="o">(</span><span class="k">new</span> <span class="nf">StockSource</span><span class="o">(</span><span class="s">"SPX"</span><span class="o">,</span> <span class="mi">10</span><span class="o">));</span> - <span class="n">DataStream</span><span class="o"><</span><span class="n">StockPrice</span><span class="o">></span> <span class="n">FTSE_stream</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="na">addSource</span><span class="o">(</span><span class="k">new</span> <span class="nf">StockSource</span><span class="o">(</span><span class="s">"FTSE"</span><span class="o">,</span> <span class="mi">20</span><span class="o">));</span> - <span class="n">DataStream</span><span class="o"><</span><span class="n">StockPrice</span><span class="o">></span> <span class="n">DJI_stream</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="na">addSource</span><span class="o">(</span><span class="k">new</span> <span class="nf">StockSource</span><span class="o">(</span><span class="s">"DJI"</span><span class="o">,</span> <span class="mi">30</span><span class="o">));</span> - <span class="n">DataStream</span><span class="o"><</span><span class="n">StockPrice</span><span class="o">></span> <span class="n">BUX_stream</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="na">addSource</span><span class="o">(</span><span class="k">new</span> <span class="nf">StockSource</span><span class="o">(</span><span class="s">"BUX"</span><span class="o">,</span> <span class="mi">40</span><span class="o">));</span> - - <span class="c1">//Merge all stock streams together</span> - <span class="n">DataStream</span><span class="o"><</span><span class="n">StockPrice</span><span class="o">></span> <span class="n">stockStream</span> <span class="o">=</span> <span class="n">socketStockStream</span> - <span class="o">.</span><span class="na">merge</span><span class="o">(</span><span class="n">SPX_stream</span><span class="o">,</span> <span class="n">FTSE_stream</span><span class="o">,</span> <span class="n">DJI_stream</span><span class="o">,</span> <span class="n">BUX_stream</span><span class="o">);</span> - - <span class="n">stockStream</span><span class="o">.</span><span class="na">print</span><span class="o">();</span> - - <span class="n">env</span><span class="o">.</span><span class="na">execute</span><span class="o">(</span><span class="s">"Stock stream"</span><span class="o">);</span></code></pre></div> - - </div> -</div> - -<p>See -<a href="http://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/index.html#data-sources">here</a> -on how you can create streaming sources for Flink Streaming -programs. Flink, of course, has support for reading in streams from -<a href="http://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/connectors/index.html">external -sources</a> -such as Apache Kafka, Apache Flume, RabbitMQ, and others. For the sake -of this example, the data streams are simply generated using the -<code>generateStock</code> method:</p> - -<div class="codetabs"> - <div data-lang="scala"> - - <div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">symbols</span> <span class="k">=</span> <span class="nc">List</span><span class="o">(</span><span class="s">"SPX"</span><span class="o">,</span> <span class="s">"FTSE"</span><span class="o">,</span> <span class="s">"DJI"</span><span class="o">,</span> <span class="s">"DJT"</span><span class="o">,</span> <span class="s">"BUX"</span><span class="o">,</span> <span class="s">"DAX"</span><span class="o">,</span> <span class="s">"GOOG"</span><span class="o">)</span> - -<span class="k">case</span> <span class="k">class</span> <span class="nc">StockPrice</span><span class="o">(</span><span class="n">symbol</span><span class="k">:</span> <span class="kt">String</span><span class="o">,</span> <span class="n">price</span><span class="k">:</span> <span class="kt">Double</span><span class="o">)</span> - -<span class="k">def</span> <span class="n">generateStock</span><span class="o">(</span><span class="n">symbol</span><span class="k">:</span> <span class="kt">String</span><span class="o">)(</span><span class="n">sigma</span><span class="k">:</span> <span class="kt">Int</span><span class="o">)(</span><span class="n">out</span><span class="k">:</span> <span class="kt">Collector</span><span class="o">[</span><span class="kt">StockPrice</span><span class="o">])</span> <span class="k">=</span> <span class="o">{</span> - <span class="k">var</span> <span class="n">price</span> <span class="k">=</span> <span class="mf">1000.</span> - <span class="k">while</span> <span class="o">(</span><span class="kc">true</span><span class="o">)</span> <span class="o">{</span> - <span class="n">price</span> <span class="k">=</span> <span class="n">price</span> <span class="o">+</span> <span class="nc">Random</span><span class="o">.</span><span class="n">nextGaussian</span> <span class="o">*</span> <span class="n">sigma</span> - <span class="n">out</span><span class="o">.</span><span class="n">collect</span><span class="o">(</span><span class="nc">StockPrice</span><span class="o">(</span><span class="n">symbol</span><span class="o">,</span> <span class="n">price</span><span class="o">))</span> - <span class="nc">Thread</span><span class="o">.</span><span class="n">sleep</span><span class="o">(</span><span class="nc">Random</span><span class="o">.</span><span class="n">nextInt</span><span class="o">(</span><span class="mi">200</span><span class="o">))</span> - <span class="o">}</span> -<span class="o">}</span></code></pre></div> - - </div> - <div data-lang="java7"> - - <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">private</span> <span class="kd">static</span> <span class="kd">final</span> <span class="n">ArrayList</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">SYMBOLS</span> <span class="o">=</span> <span class="k">new</span> <span class="n">ArrayList</span><span class="o"><</span><span class="n">String</span><span class="o">>(</span> - <span class="n">Arrays</span><span class="o">.</span><span class="na">asList</span><span class="o">(</span><span class="s">"SPX"</span><span class="o">,</span> <span class="s">"FTSE"</span><span class="o">,</span> <span class="s">"DJI"</span><span class="o">,</span> <span class="s">"DJT"</span><span class="o">,</span> <span class="s">"BUX"</span><span class="o">,</span> <span class="s">"DAX"</span><span class="o">,</span> <span class="s">"GOOG"</span><span class="o">));</span> - -<span class="kd">public</span> <span class="kd">static</span> <span class="kd">class</span> <span class="nc">StockPrice</span> <span class="kd">implements</span> <span class="n">Serializable</span> <span class="o">{</span> - - <span class="kd">public</span> <span class="n">String</span> <span class="n">symbol</span><span class="o">;</span> - <span class="kd">public</span> <span class="n">Double</span> <span class="n">price</span><span class="o">;</span> - - <span class="kd">public</span> <span class="nf">StockPrice</span><span class="o">()</span> <span class="o">{</span> - <span class="o">}</span> - - <span class="kd">public</span> <span class="nf">StockPrice</span><span class="o">(</span><span class="n">String</span> <span class="n">symbol</span><span class="o">,</span> <span class="n">Double</span> <span class="n">price</span><span class="o">)</span> <span class="o">{</span> - <span class="k">this</span><span class="o">.</span><span class="na">symbol</span> <span class="o">=</span> <span class="n">symbol</span><span class="o">;</span> - <span class="k">this</span><span class="o">.</span><span class="na">price</span> <span class="o">=</span> <span class="n">price</span><span class="o">;</span> - <span class="o">}</span> - - <span class="nd">@Override</span> - <span class="kd">public</span> <span class="n">String</span> <span class="nf">toString</span><span class="o">()</span> <span class="o">{</span> - <span class="k">return</span> <span class="s">"StockPrice{"</span> <span class="o">+</span> - <span class="s">"symbol='"</span> <span class="o">+</span> <span class="n">symbol</span> <span class="o">+</span> <span class="sc">'\''</span> <span class="o">+</span> - <span class="s">", count="</span> <span class="o">+</span> <span class="n">price</span> <span class="o">+</span> - <span class="sc">'}'</span><span class="o">;</span> - <span class="o">}</span> -<span class="o">}</span> - -<span class="kd">public</span> <span class="kd">final</span> <span class="kd">static</span> <span class="kd">class</span> <span class="nc">StockSource</span> <span class="kd">implements</span> <span class="n">SourceFunction</span><span class="o"><</span><span class="n">StockPrice</span><span class="o">></span> <span class="o">{</span> - - <span class="kd">private</span> <span class="n">Double</span> <span class="n">price</span><span class="o">;</span> - <span class="kd">private</span> <span class="n">String</span> <span class="n">symbol</span><span class="o">;</span> - <span class="kd">private</span> <span class="n">Integer</span> <span class="n">sigma</span><span class="o">;</span> - - <span class="kd">public</span> <span class="nf">StockSource</span><span class="o">(</span><span class="n">String</span> <span class="n">symbol</span><span class="o">,</span> <span class="n">Integer</span> <span class="n">sigma</span><span class="o">)</span> <span class="o">{</span> - <span class="k">this</span><span class="o">.</span><span class="na">symbol</span> <span class="o">=</span> <span class="n">symbol</span><span class="o">;</span> - <span class="k">this</span><span class="o">.</span><span class="na">sigma</span> <span class="o">=</span> <span class="n">sigma</span><span class="o">;</span> - <span class="o">}</span> - - <span class="nd">@Override</span> - <span class="kd">public</span> <span class="kt">void</span> <span class="nf">invoke</span><span class="o">(</span><span class="n">Collector</span><span class="o"><</span><span class="n">StockPrice</span><span class="o">></span> <span class="n">collector</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span> - <span class="n">price</span> <span class="o">=</span> <span class="n">DEFAULT_PRICE</span><span class="o">;</span> - <span class="n">Random</span> <span class="n">random</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">Random</span><span class="o">();</span> - - <span class="k">while</span> <span class="o">(</span><span class="kc">true</span><span class="o">)</span> <span class="o">{</span> - <span class="n">price</span> <span class="o">=</span> <span class="n">price</span> <span class="o">+</span> <span class="n">random</span><span class="o">.</span><span class="na">nextGaussian</span><span class="o">()</span> <span class="o">*</span> <span class="n">sigma</span><span class="o">;</span> - <span class="n">collector</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="k">new</span> <span class="nf">StockPrice</span><span class="o">(</span><span class="n">symbol</span><span class="o">,</span> <span class="n">price</span><span class="o">));</span> - <span class="n">Thread</span><span class="o">.</span><span class="na">sleep</span><span class="o">(</span><span class="n">random</span><span class="o">.</span><span class="na">nextInt</span><span class="o">(</span><span class="mi">200</span><span class="o">));</span> - <span class="o">}</span> - <span class="o">}</span> -<span class="o">}</span></code></pre></div> - - </div> -</div> - -<p>To read from the text socket stream please make sure that you have a -socket running. For the sake of the example executing the following -command in a terminal does the job. You can get -<a href="http://netcat.sourceforge.net/">netcat</a> here if it is not available -on your machine.</p> - -<div class="highlight"><pre><code>nc -lk 9999 -</code></pre></div> - -<p>If we execute the program from our IDE we see the system the -stock prices being generated:</p> - -<div class="highlight"><pre><code>INFO Job execution switched to status RUNNING. -INFO Socket Stream(1/1) switched to SCHEDULED -INFO Socket Stream(1/1) switched to DEPLOYING -INFO Custom Source(1/1) switched to SCHEDULED -INFO Custom Source(1/1) switched to DEPLOYING -⦠-1> StockPrice{symbol='SPX', count=1011.3405732645239} -2> StockPrice{symbol='SPX', count=1018.3381290039248} -1> StockPrice{symbol='DJI', count=1036.7454894073978} -3> StockPrice{symbol='DJI', count=1135.1170217478427} -3> StockPrice{symbol='BUX', count=1053.667523187687} -4> StockPrice{symbol='BUX', count=1036.552601487263} -</code></pre></div> - -<p><a href="#top">Back to top</a></p> - -<h2 id="window-aggregations">Window aggregations</h2> - -<p>We first compute aggregations on time-based windows of the -data. Flink provides <a href="http://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/windows.html">flexible windowing semantics</a> where windows can -also be defined based on count of records or any custom user defined -logic.</p> - -<p>We partition our stream into windows of 10 seconds and slide the -window every 5 seconds. We compute three statistics every 5 seconds. -The first is the minimum price of all stocks, the second produces -maximum price per stock, and the third is the mean stock price -(using a map window function). Aggregations and groupings can be -performed on named fields of POJOs, making the code more readable.</p> - -<p><img alt="Basic windowing aggregations" src="/img/blog/blog_basic_window.png" width="70%" class="img-responsive center-block" /></p> - -<div class="codetabs"> - - <div data-lang="scala"> - - <div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="c1">//Define the desired time window</span> -<span class="k">val</span> <span class="n">windowedStream</span> <span class="k">=</span> <span class="n">stockStream</span> - <span class="o">.</span><span class="n">window</span><span class="o">(</span><span class="nc">Time</span><span class="o">.</span><span class="n">of</span><span class="o">(</span><span class="mi">10</span><span class="o">,</span> <span class="nc">SECONDS</span><span class="o">)).</span><span class="n">every</span><span class="o">(</span><span class="nc">Time</span><span class="o">.</span><span class="n">of</span><span class="o">(</span><span class="mi">5</span><span class="o">,</span> <span class="nc">SECONDS</span><span class="o">))</span> - -<span class="c1">//Compute some simple statistics on a rolling window</span> -<span class="k">val</span> <span class="n">lowest</span> <span class="k">=</span> <span class="n">windowedStream</span><span class="o">.</span><span class="n">minBy</span><span class="o">(</span><span class="s">"price"</span><span class="o">)</span> -<span class="k">val</span> <span class="n">maxByStock</span> <span class="k">=</span> <span class="n">windowedStream</span><span class="o">.</span><span class="n">groupBy</span><span class="o">(</span><span class="s">"symbol"</span><span class="o">).</span><span class="n">maxBy</span><span class="o">(</span><span class="s">"price"</span><span class="o">)</span> -<span class="k">val</span> <span class="n">rollingMean</span> <span class="k">=</span> <span class="n">windowedStream</span><span class="o">.</span><span class="n">groupBy</span><span class="o">(</span><span class="s">"symbol"</span><span class="o">).</span><span class="n">mapWindow</span><span class="o">(</span><span class="n">mean</span> <span class="k">_</span><span class="o">)</span> - -<span class="c1">//Compute the mean of a window</span> -<span class="k">def</span> <span class="n">mean</span><span class="o">(</span><span class="n">ts</span><span class="k">:</span> <span class="kt">Iterable</span><span class="o">[</span><span class="kt">StockPrice</span><span class="o">],</span> <span class="n">out</span><span class="k">:</span> <span class="kt">Collector</span><span class="o">[</span><span class="kt">StockPrice</span><span class="o">])</span> <span class="k">=</span> <span class="o">{</span> - <span class="k">if</span> <span class="o">(</span><span class="n">ts</span><span class="o">.</span><span class="n">nonEmpty</span><span class="o">)</span> <span class="o">{</span> - <span class="n">out</span><span class="o">.</span><span class="n">collect</span><span class="o">(</span><span class="nc">StockPrice</span><span class="o">(</span><span class="n">ts</span><span class="o">.</span><span class="n">head</span><span class="o">.</span><span class="n">symbol</span><span class="o">,</span> <span class="n">ts</span><span class="o">.</span><span class="n">foldLeft</span><span class="o">(</span><span class="mi">0</span><span class="k">:</span> <span class="kt">Double</span><span class="o">)(</span><span class="k">_</span> <span class="o">+</span> <span class="k">_</span><span class="o">.</span><span class="n">price</span><span class="o">)</span> <span class="o">/</span> <span class="n">ts</span><span class="o">.</span><span class="n">size</span><span class="o">))</span> - <span class="o">}</span> -<span class="o">}</span></code></pre></div> - - </div> - - <div data-lang="java7"> - - <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="c1">//Define the desired time window</span> -<span class="n">WindowedDataStream</span><span class="o"><</span><span class="n">StockPrice</span><span class="o">></span> <span class="n">windowedStream</span> <span class="o">=</span> <span class="n">stockStream</span> - <span class="o">.</span><span class="na">window</span><span class="o">(</span><span class="n">Time</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="mi">10</span><span class="o">,</span> <span class="n">TimeUnit</span><span class="o">.</span><span class="na">SECONDS</span><span class="o">))</span> - <span class="o">.</span><span class="na">every</span><span class="o">(</span><span class="n">Time</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="mi">5</span><span class="o">,</span> <span class="n">TimeUnit</span><span class="o">.</span><span class="na">SECONDS</span><span class="o">));</span> - -<span class="c1">//Compute some simple statistics on a rolling window</span> -<span class="n">DataStream</span><span class="o"><</span><span class="n">StockPrice</span><span class="o">></span> <span class="n">lowest</span> <span class="o">=</span> <span class="n">windowedStream</span><span class="o">.</span><span class="na">minBy</span><span class="o">(</span><span class="s">"price"</span><span class="o">).</span><span class="na">flatten</span><span class="o">();</span> -<span class="n">DataStream</span><span class="o"><</span><span class="n">StockPrice</span><span class="o">></span> <span class="n">maxByStock</span> <span class="o">=</span> <span class="n">windowedStream</span><span class="o">.</span><span class="na">groupBy</span><span class="o">(</span><span class="s">"symbol"</span><span class="o">)</span> - <span class="o">.</span><span class="na">maxBy</span><span class="o">(</span><span class="s">"price"</span><span class="o">).</span><span class="na">flatten</span><span class="o">();</span> -<span class="n">DataStream</span><span class="o"><</span><span class="n">StockPrice</span><span class="o">></span> <span class="n">rollingMean</span> <span class="o">=</span> <span class="n">windowedStream</span><span class="o">.</span><span class="na">groupBy</span><span class="o">(</span><span class="s">"symbol"</span><span class="o">)</span> - <span class="o">.</span><span class="na">mapWindow</span><span class="o">(</span><span class="k">new</span> <span class="nf">WindowMean</span><span class="o">()).</span><span class="na">flatten</span><span class="o">();</span> - -<span class="c1">//Compute the mean of a window</span> -<span class="kd">public</span> <span class="kd">final</span> <span class="kd">static</span> <span class="kd">class</span> <span class="nc">WindowMean</span> <span class="kd">implements</span> - <span class="n">WindowMapFunction</span><span class="o"><</span><span class="n">StockPrice</span><span class="o">,</span> <span class="n">StockPrice</span><span class="o">></span> <span class="o">{</span> - - <span class="kd">private</span> <span class="n">Double</span> <span class="n">sum</span> <span class="o">=</span> <span class="mf">0.0</span><span class="o">;</span> - <span class="kd">private</span> <span class="n">Integer</span> <span class="n">count</span> <span class="o">=</span> <span class="mi">0</span><span class="o">;</span> - <span class="kd">private</span> <span class="n">String</span> <span class="n">symbol</span> <span class="o">=</span> <span class="s">""</span><span class="o">;</span> - - <span class="nd">@Override</span> - <span class="kd">public</span> <span class="kt">void</span> <span class="nf">mapWindow</span><span class="o">(</span><span class="n">Iterable</span><span class="o"><</span><span class="n">StockPrice</span><span class="o">></span> <span class="n">values</span><span class="o">,</span> <span class="n">Collector</span><span class="o"><</span><span class="n">StockPrice</span><span class="o">></span> <span class="n">out</span><span class="o">)</span> - <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span> - - <span class="k">if</span> <span class="o">(</span><span class="n">values</span><span class="o">.</span><span class="na">iterator</span><span class="o">().</span><span class="na">hasNext</span><span class="o">())</span> <span class="o">{</span><span class="n">s</span> - <span class="nf">for</span> <span class="o">(</span><span class="n">StockPrice</span> <span class="n">sp</span> <span class="o">:</span> <span class="n">values</span><span class="o">)</span> <span class="o">{</span> - <span class="n">sum</span> <span class="o">+=</span> <span class="n">sp</span><span class="o">.</span><span class="na">price</span><span class="o">;</span> - <span class="n">symbol</span> <span class="o">=</span> <span class="n">sp</span><span class="o">.</span><span class="na">symbol</span><span class="o">;</span> - <span class="n">count</span><span class="o">++;</span> - <span class="o">}</span> - <span class="n">out</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="k">new</span> <span class="nf">StockPrice</span><span class="o">(</span><span class="n">symbol</span><span class="o">,</span> <span class="n">sum</span> <span class="o">/</span> <span class="n">count</span><span class="o">));</span> - <span class="o">}</span> - <span class="o">}</span> -<span class="o">}</span></code></pre></div> - - </div> - -</div> - -<p>Let us note that to print a windowed stream one has to flatten it first, -thus getting rid of the windowing logic. For example execute -<code>maxByStock.flatten().print()</code> to print the stream of maximum prices of - the time windows by stock. For Scala <code>flatten()</code> is called implicitly -when needed.</p> - -<p><a href="#top">Back to top</a></p> - -<h2 id="data-driven-windows">Data-driven windows</h2> - -<p>The most interesting event in the stream is when the price of a stock -is changing rapidly. We can send a warning when a stock price changes -more than 5% since the last warning. To do that, we use a delta-based window providing a -threshold on when the computation will be triggered, a function to -compute the difference and a default value with which the first record -is compared. We also create a <code>Count</code> data type to count the warnings -every 30 seconds.</p> - -<p><img alt="Data-driven windowing semantics" src="/img/blog/blog_data_driven.png" width="100%" class="img-responsive center-block" /></p> - -<div class="codetabs"> - - <div data-lang="scala"> - - <div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">case</span> <span class="k">class</span> <span class="nc">Count</span><span class="o">(</span><span class="n">symbol</span><span class="k">:</span> <span class="kt">String</span><span class="o">,</span> <span class="n">count</span><span class="k">:</span> <span class="kt">Int</span><span class="o">)</span> -<span class="k">val</span> <span class="n">defaultPrice</span> <span class="k">=</span> <span class="nc">StockPrice</span><span class="o">(</span><span class="s">""</span><span class="o">,</span> <span class="mi">1000</span><span class="o">)</span> - -<span class="c1">//Use delta policy to create price change warnings</span> -<span class="k">val</span> <span class="n">priceWarnings</span> <span class="k">=</span> <span class="n">stockStream</span><span class="o">.</span><span class="n">groupBy</span><span class="o">(</span><span class="s">"symbol"</span><span class="o">)</span> - <span class="o">.</span><span class="n">window</span><span class="o">(</span><span class="nc">Delta</span><span class="o">.</span><span class="n">of</span><span class="o">(</span><span class="mf">0.05</span><span class="o">,</span> <span class="n">priceChange</span><span class="o">,</span> <span class="n">defaultPrice</span><span class="o">))</span> - <span class="o">.</span><span class="n">mapWindow</span><span class="o">(</span><span class="n">sendWarning</span> <span class="k">_</span><span class="o">)</span> - -<span class="c1">//Count the number of warnings every half a minute</span> -<span class="k">val</span> <span class="n">warningsPerStock</span> <span class="k">=</span> <span class="n">priceWarnings</span><span class="o">.</span><span class="n">map</span><span class="o">(</span><span class="nc">Count</span><span class="o">(</span><span class="k">_</span><span class="o">,</span> <span class="mi">1</span><span class="o">))</span> - <span class="o">.</span><span class="n">groupBy</span><span class="o">(</span><span class="s">"symbol"</span><span class="o">)</span> - <span class="o">.</span><span class="n">window</span><span class="o">(</span><span class="nc">Time</span><span class="o">.</span><span class="n">of</span><span class="o">(</span><span class="mi">30</span><span class="o">,</span> <span class="nc">SECONDS</span><span class="o">))</span> - <span class="o">.</span><span class="n">sum</span><span class="o">(</span><span class="s">"count"</span><span class="o">)</span> - -<span class="k">def</span> <span class="n">priceChange</span><span class="o">(</span><span class="n">p1</span><span class="k">:</span> <span class="kt">StockPrice</span><span class="o">,</span> <span class="n">p2</span><span class="k">:</span> <span class="kt">StockPrice</span><span class="o">)</span><span class="k">:</span> <span class="kt">Double</span> <span class="o">=</span> <span class="o">{</span> - <span class="nc">Math</span><span class="o">.</span><span class="n">abs</span><span class="o">(</span><span class="n">p1</span><span class="o">.</span><span class="n">price</span> <span class="o">/</span> <span class="n">p2</span><span class="o">.</span><span class="n">price</span> <span class="o">-</span> <span class="mi">1</span><span class="o">)</span> -<span class="o">}</span> - -<span class="k">def</span> <span class="n">sendWarning</span><span class="o">(</span><span class="n">ts</span><span class="k">:</span> <span class="kt">Iterable</span><span class="o">[</span><span class="kt">StockPrice</span><span class="o">],</span> <span class="n">out</span><span class="k">:</span> <span class="kt">Collector</span><span class="o">[</span><span class="kt">String</span><span class="o">])</span> <span class="k">=</span> <span class="o">{</span> - <span class="k">if</span> <span class="o">(</span><span class="n">ts</span><span class="o">.</span><span class="n">nonEmpty</span><span class="o">)</span> <span class="n">out</span><span class="o">.</span><span class="n">collect</span><span class="o">(</span><span class="n">ts</span><span class="o">.</span><span class="n">head</span><span class="o">.</span><span class="n">symbol</span><span class="o">)</span> -<span class="o">}</span></code></pre></div> - - </div> - - <div data-lang="java7"> - - <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">private</span> <span class="kd">static</span> <span class="kd">final</span> <span class="n">Double</span> <span class="n">DEFAULT_PRICE</span> <span class="o">=</span> <span class="mi">1000</span><span class="o">.;</span> -<span class="kd">private</span> <span class="kd">static</span> <span class="kd">final</span> <span class="n">StockPrice</span> <span class="n">DEFAULT_STOCK_PRICE</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">StockPrice</span><span class="o">(</span><span class="s">""</span><span class="o">,</span> <span class="n">DEFAULT_PRICE</span><span class="o">);</span> - -<span class="c1">//Use delta policy to create price change warnings</span> -<span class="n">DataStream</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">priceWarnings</span> <span class="o">=</span> <span class="n">stockStream</span><span class="o">.</span><span class="na">groupBy</span><span class="o">(</span><span class="s">"symbol"</span><span class="o">)</span> - <span class="o">.</span><span class="na">window</span><span class="o">(</span><span class="n">Delta</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="mf">0.05</span><span class="o">,</span> <span class="k">new</span> <span class="n">DeltaFunction</span><span class="o"><</span><span class="n">StockPrice</span><span class="o">>()</span> <span class="o">{</span> - <span class="nd">@Override</span> - <span class="kd">public</span> <span class="kt">double</span> <span class="nf">getDelta</span><span class="o">(</span><span class="n">StockPrice</span> <span class="n">oldDataPoint</span><span class="o">,</span> <span class="n">StockPrice</span> <span class="n">newDataPoint</span><span class="o">)</span> <span class="o">{</span> - <span class="k">return</span> <span class="n">Math</span><span class="o">.</span><span class="na">abs</span><span class="o">(</span><span class="n">oldDataPoint</span><span class="o">.</span><span class="na">price</span> <span class="o">-</span> <span class="n">newDataPoint</span><span class="o">.</span><span class="na">price</span><span class="o">);</span> - <span class="o">}</span> - <span class="o">},</span> <span class="n">DEFAULT_STOCK_PRICE</span><span class="o">))</span> -<span class="o">.</span><span class="na">mapWindow</span><span class="o">(</span><span class="k">new</span> <span class="nf">SendWarning</span><span class="o">()).</span><span class="na">flatten</span><span class="o">();</span> - -<span class="c1">//Count the number of warnings every half a minute</span> -<span class="n">DataStream</span><span class="o"><</span><span class="n">Count</span><span class="o">></span> <span class="n">warningsPerStock</span> <span class="o">=</span> <span class="n">priceWarnings</span><span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="k">new</span> <span class="n">MapFunction</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Count</span><span class="o">>()</span> <span class="o">{</span> - <span class="nd">@Override</span> - <span class="kd">public</span> <span class="n">Count</span> <span class="nf">map</span><span class="o">(</span><span class="n">String</span> <span class="n">value</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span> - <span class="k">return</span> <span class="k">new</span> <span class="nf">Count</span><span class="o">(</span><span class="n">value</span><span class="o">,</span> <span class="mi">1</span><span class="o">);</span> - <span class="o">}</span> -<span class="o">}).</span><span class="na">groupBy</span><span class="o">(</span><span class="s">"symbol"</span><span class="o">).</span><span class="na">window</span><span class="o">(</span><span class="n">Time</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="mi">30</span><span class="o">,</span> <span class="n">TimeUnit</span><span class="o">.</span><span class="na">SECONDS</span><span class="o">)).</span><span class="na">sum</span><span class="o">(</span><span class="s">"count"</span><span class="o">).</span><span class="na">flatten</span><span class="o">();</span> - -<span class="kd">public</span> <span class="kd">static</span> <span class="kd">class</span> <span class="nc">Count</span> <span class="kd">implements</span> <span class="n">Serializable</span> <span class="o">{</span> - <span class="kd">public</span> <span class="n">String</span> <span class="n">symbol</span><span class="o">;</span> - <span class="kd">public</span> <span class="n">Integer</span> <span class="n">count</span><span class="o">;</span> - - <span class="kd">public</span> <span class="nf">Count</span><span class="o">()</span> <span class="o">{</span> - <span class="o">}</span> - - <span class="kd">public</span> <span class="nf">Count</span><span class="o">(</span><span class="n">String</span> <span class="n">symbol</span><span class="o">,</span> <span class="n">Integer</span> <span class="n">count</span><span class="o">)</span> <span class="o">{</span> - <span class="k">this</span><span class="o">.</span><span class="na">symbol</span> <span class="o">=</span> <span class="n">symbol</span><span class="o">;</span> - <span class="k">this</span><span class="o">.</span><span class="na">count</span> <span class="o">=</span> <span class="n">count</span><span class="o">;</span> - <span class="o">}</span> - - <span class="nd">@Override</span> - <span class="kd">public</span> <span class="n">String</span> <span class="nf">toString</span><span class="o">()</span> <span class="o">{</span> - <span class="k">return</span> <span class="s">"Count{"</span> <span class="o">+</span> - <span class="s">"symbol='"</span> <span class="o">+</span> <span class="n">symbol</span> <span class="o">+</span> <span class="sc">'\''</span> <span class="o">+</span> - <span class="s">", count="</span> <span class="o">+</span> <span class="n">count</span> <span class="o">+</span> - <span class="sc">'}'</span><span class="o">;</span> - <span class="o">}</span> -<span class="o">}</span> - -<span class="kd">public</span> <span class="kd">static</span> <span class="kd">final</span> <span class="kd">class</span> <span class="nc">SendWarning</span> <span class="kd">implements</span> <span class="n">MapWindowFunction</span><span class="o"><</span><span class="n">StockPrice</span><span class="o">,</span> <span class="n">String</span><span class="o">></span> <span class="o">{</span> - <span class="nd">@Override</span> - <span class="kd">public</span> <span class="kt">void</span> <span class="nf">mapWindow</span><span class="o">(</span><span class="n">Iterable</span><span class="o"><</span><span class="n">StockPrice</span><span class="o">></span> <span class="n">values</span><span class="o">,</span> <span class="n">Collector</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">out</span><span class="o">)</span> - <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span> - - <span class="k">if</span> <span class="o">(</span><span class="n">values</span><span class="o">.</span><span class="na">iterator</span><span class="o">().</span><span class="na">hasNext</span><span class="o">())</span> <span class="o">{</span> - <span class="n">out</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="n">values</span><span class="o">.</span><span class="na">iterator</span><span class="o">().</span><span class="na">next</span><span class="o">().</span><span class="na">symbol</span><span class="o">);</span> - <span class="o">}</span> - <span class="o">}</span> -<span class="o">}</span></code></pre></div> - - </div> - -</div> - -<p><a href="#top">Back to top</a></p> - -<h2 id="combining-with-a-twitter-stream">Combining with a Twitter stream</h2> - -<p>Next, we will read a Twitter stream and correlate it with our stock -price stream. Flink has support for connecting to <a href="https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/connectors/twitter.html">Twitterâs -API</a> -but for the sake of this example we generate dummy tweet data.</p> - -<p><img alt="Social media analytics" src="/img/blog/blog_social_media.png" width="100%" class="img-responsive center-block" /></p> - -<div class="codetabs"> - - <div data-lang="scala"> - - <div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="c1">//Read a stream of tweets</span> -<span class="k">val</span> <span class="n">tweetStream</span> <span class="k">=</span> <span class="n">env</span><span class="o">.</span><span class="n">addSource</span><span class="o">(</span><span class="n">generateTweets</span> <span class="k">_</span><span class="o">)</span> - -<span class="c1">//Extract the stock symbols</span> -<span class="k">val</span> <span class="n">mentionedSymbols</span> <span class="k">=</span> <span class="n">tweetStream</span><span class="o">.</span><span class="n">flatMap</span><span class="o">(</span><span class="n">tweet</span> <span class="k">=></span> <span class="n">tweet</span><span class="o">.</span><span class="n">split</span><span class="o">(</span><span class="s">" "</span><span class="o">))</span> - <span class="o">.</span><span class="n">map</span><span class="o">(</span><span class="k">_</span><span class="o">.</span><span class="n">toUpperCase</span><span class="o">())</span> - <span class="o">.</span><span class="n">filter</span><span class="o">(</span><span class="n">symbols</span><span class="o">.</span><span class="n">contains</span><span class="o">(</span><span class="k">_</span><span class="o">))</span> - -<span class="c1">//Count the extracted symbols</span> -<span class="k">val</span> <span class="n">tweetsPerStock</span> <span class="k">=</span> <span class="n">mentionedSymbols</span><span class="o">.</span><span class="n">map</span><span class="o">(</span><span class="nc">Count</span><span class="o">(</span><span class="k">_</span><span class="o">,</span> <span class="mi">1</span><span class="o">))</span> - <span class="o">.</span><span class="n">groupBy</span><span class="o">(</span><span class="s">"symbol"</span><span class="o">)</span> - <span class="o">.</span><span class="n">window</span><span class="o">(</span><span class="nc">Time</span><span class="o">.</span><span class="n">of</span><span class="o">(</span><span class="mi">30</span><span class="o">,</span> <span class="nc">SECONDS</span><span class="o">))</span> - <span class="o">.</span><span class="n">sum</span><span class="o">(</span><span class="s">"count"</span><span class="o">)</span> - -<span class="k">def</span> <span class="n">generateTweets</span><span class="o">(</span><span class="n">out</span><span class="k">:</span> <span class="kt">Collector</span><span class="o">[</span><span class="kt">String</span><span class="o">])</span> <span class="k">=</span> <span class="o">{</span> - <span class="k">while</span> <span class="o">(</span><span class="kc">true</span><span class="o">)</span> <span class="o">{</span> - <span class="k">val</span> <span class="n">s</span> <span class="k">=</span> <span class="k">for</span> <span class="o">(</span><span class="n">i</span> <span class="k"><-</span> <span class="mi">1</span> <span class="n">to</span> <span class="mi">3</span><span class="o">)</span> <span class="k">yield</span> <span class="o">(</span><span class="n">symbols</span><span class="o">(</span><span class="nc">Random</span><span class="o">.</span><span class="n">nextInt</span><span class="o">(</span><span class="n">symbols</span><span class="o">.</span><span class="n">size</span><span class="o">)))</span> - <span class="n">out</span><span class="o">.</span><span class="n">collect</span><span class="o">(</span><span class="n">s</span><span class="o">.</span><span class="n">mkString</span><span class="o">(</span><span class="s">" "</span><span class="o">))</span> - <span class="nc">Thread</span><span class="o">.</span><span class="n">sleep</span><span class="o">(</span><span class="nc">Random</span><span class="o">.</span><span class="n">nextInt</span><span class="o">(</span><span class="mi">500</span><span class="o">))</span> - <span class="o">}</span> -<span class="o">}</span></code></pre></div> - - </div> - - <div data-lang="java7"> - - <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="c1">//Read a stream of tweets</span> -<span class="n">DataStream</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">tweetStream</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="na">addSource</span><span class="o">(</span><span class="k">new</span> <span class="nf">TweetSource</span><span class="o">());</span> - -<span class="c1">//Extract the stock symbols</span> -<span class="n">DataStream</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">mentionedSymbols</span> <span class="o">=</span> <span class="n">tweetStream</span><span class="o">.</span><span class="na">flatMap</span><span class="o">(</span> - <span class="k">new</span> <span class="n">FlatMapFunction</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">>()</span> <span class="o">{</span> - <span class="nd">@Override</span> - <span class="kd">public</span> <span class="kt">void</span> <span class="nf">flatMap</span><span class="o">(</span><span class="n">String</span> <span class="n">value</span><span class="o">,</span> <span class="n">Collector</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">out</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span> - <span class="n">String</span><span class="o">[]</span> <span class="n">words</span> <span class="o">=</span> <span class="n">value</span><span class="o">.</span><span class="na">split</span><span class="o">(</span><span class="s">" "</span><span class="o">);</span> - <span class="k">for</span> <span class="o">(</span><span class="n">String</span> <span class="n">word</span> <span class="o">:</span> <span class="n">words</span><span class="o">)</span> <span class="o">{</span> - <span class="n">out</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="n">word</span><span class="o">.</span><span class="na">toUpperCase</span><span class="o">());</span> - <span class="o">}</span> - <span class="o">}</span> -<span class="o">}).</span><span class="na">filter</span><span class="o">(</span><span class="k">new</span> <span class="n">FilterFunction</span><span class="o"><</span><span class="n">String</span><span class="o">>()</span> <span class="o">{</span> - <span class="nd">@Override</span> - <span class="kd">public</span> <span class="kt">boolean</span> <span class="nf">filter</span><span class="o">(</span><span class="n">String</span> <span class="n">value</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span> - <span class="k">return</span> <span class="n">SYMBOLS</span><span class="o">.</span><span class="na">contains</span><span class="o">(</span><span class="n">value</span><span class="o">);</span> - <span class="o">}</span> -<span class="o">});</span> - -<span class="c1">//Count the extracted symbols</span> -<span class="n">DataStream</span><span class="o"><</span><span class="n">Count</span><span class="o">></span> <span class="n">tweetsPerStock</span> <span class="o">=</span> <span class="n">mentionedSymbols</span><span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="k">new</span> <span class="n">MapFunction</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Count</span><span class="o">>()</span> <span class="o">{</span> - <span class="nd">@Override</span> - <span class="kd">public</span> <span class="n">Count</span> <span class="nf">map</span><span class="o">(</span><span class="n">String</span> <span class="n">value</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span> - <span class="k">return</span> <span class="k">new</span> <span class="nf">Count</span><span class="o">(</span><span class="n">value</span><span class="o">,</span> <span class="mi">1</span><span class="o">);</span> - <span class="o">}</span> -<span class="o">}).</span><span class="na">groupBy</span><span class="o">(</span><span class="s">"symbol"</span><span class="o">).</span><span class="na">window</span><span class="o">(</span><span class="n">Time</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="mi">30</span><span class="o">,</span> <span class="n">TimeUnit</span><span class="o">.</span><span class="na">SECONDS</span><span class="o">)).</span><span class="na">sum</span><span class="o">(</span><span class="s">"count"</span><span class="o">).</span><span class="na">flatten</span><span class="o">();</span> - -<span class="kd">public</span> <span class="kd">static</span> <span class="kd">final</span> <span class="kd">class</span> <span class="nc">TweetSource</span> <span class="kd">implements</span> <span class="n">SourceFunction</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="o">{</span> - <span class="n">Random</span> <span class="n">random</span><span class="o">;</span> - <span class="n">StringBuilder</span> <span class="n">stringBuilder</span><span class="o">;</span> - - <span class="nd">@Override</span> - <span class="kd">public</span> <span class="kt">void</span> <span class="nf">invoke</span><span class="o">(</span><span class="n">Collector</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">collector</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span> - <span class="n">random</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">Random</span><span class="o">();</span> - <span class="n">stringBuilder</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">StringBuilder</span><span class="o">();</span> - - <span class="k">while</span> <span class="o">(</span><span class="kc">true</span><span class="o">)</span> <span class="o">{</span> - <span class="n">stringBuilder</span><span class="o">.</span><span class="na">setLength</span><span class="o">(</span><span class="mi">0</span><span class="o">);</span> - <span class="k">for</span> <span class="o">(</span><span class="kt">int</span> <span class="n">i</span> <span class="o">=</span> <span class="mi">0</span><span class="o">;</span> <span class="n">i</span> <span class="o"><</span> <span class="mi">3</span><span class="o">;</span> <span class="n">i</span><span class="o">++)</span> <span class="o">{</span> - <span class="n">stringBuilder</span><span class="o">.</span><span class="na">append</span><span class="o">(</span><span class="s">" "</span><span class="o">);</span> - <span class="n">stringBuilder</span><span class="o">.</span><span class="na">append</span><span class="o">(</span><span class="n">SYMBOLS</span><span class="o">.</span><span class="na">get</span><span class="o">(</span><span class="n">random</span><span class="o">.</span><span class="na">nextInt</span><span class="o">(</span><span class="n">SYMBOLS</span><span class="o">.</span><span class="na">size</span><span class="o">())));</span> - <span class="o">}</span> - <span class="n">collector</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="n">stringBuilder</span><span class="o">.</span><span class="na">toString</span><span class="o">());</span> - <span class="n">Thread</span><span class="o">.</span><span class="na">sleep</span><span class="o">(</span><span class="mi">500</span><span class="o">);</span> - <span class="o">}</span> - - <span class="o">}</span> -<span class="o">}</span></code></pre></div> - - </div> - -</div> - -<p><a href="#top">Back to top</a></p> - -<h2 id="streaming-joins">Streaming joins</h2> - -<p>Finally, we join real-time tweets and stock prices and compute a -rolling correlation between the number of price warnings and the -number of mentions of a given stock in the Twitter stream. As both of -these data streams are potentially infinite, we apply the join on a -30-second window.</p> - -<p><img alt="Streaming joins" src="/img/blog/blog_stream_join.png" width="60%" class="img-responsive center-block" /></p> - -<div class="codetabs"> - - <div data-lang="scala"> - - <div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="c1">//Join warnings and parsed tweets</span> -<span class="k">val</span> <span class="n">tweetsAndWarning</span> <span class="k">=</span> <span class="n">warningsPerStock</span><span class="o">.</span><span class="n">join</span><span class="o">(</span><span class="n">tweetsPerStock</span><span class="o">)</span> - <span class="o">.</span><span class="n">onWindow</span><span class="o">(</span><span class="mi">30</span><span class="o">,</span> <span class="nc">SECONDS</span><span class="o">)</span> - <span class="o">.</span><span class="n">where</span><span class="o">(</span><span class="s">"symbol"</span><span class="o">)</span> - <span class="o">.</span><span class="n">equalTo</span><span class="o">(</span><span class="s">"symbol"</span><span class="o">)</span> <span class="o">{</span> <span class="o">(</span><span class="n">c1</span><span class="o">,</span> <span class="n">c2</span><span class="o">)</span> <span class="k">=></span> <span class="o">(</span><span class="n">c1</span><span class="o">.</span><span class="n">count</span><span class="o">,</span> <span class="n">c2</span><span class="o">.</span><span class="n">count</span><span class="o">)</span> <span class="o">}</span> - -<span class="k">val</span> <span class="n">rollingCorrelation</span> <span class="k">=</span> <span class="n">tweetsAndWarning</span><span class="o">.</span><span class="n">window</span><span class="o">(</span><span class="nc">Time</span><span class="o">.</span><span class="n">of</span><span class="o">(</span><span class="mi">30</span><span class="o">,</span> <span class="nc">SECONDS</span><span class="o">))</span> - <span class="o">.</span><span class="n">mapWindow</span><span class="o">(</span><span class="n">computeCorrelation</span> <span class="k">_</span><span class="o">)</span> - -<span class="n">rollingCorrelation</span> <span class="n">print</span> - -<span class="c1">//Compute rolling correlation</span> -<span class="k">def</span> <span class="n">computeCorrelation</span><span class="o">(</span><span class="n">input</span><span class="k">:</span> <span class="kt">Iterable</span><span class="o">[(</span><span class="kt">Int</span>, <span class="kt">Int</span><span class="o">)],</span> <span class="n">out</span><span class="k">:</span> <span class="kt">Collector</span><span class="o">[</span><span class="kt">Double</span><span class="o">])</span> <span class="k">=</span> <span class="o">{</span> - <span class="k">if</span> <span class="o">(</span><span class="n">input</span><span class="o">.</span><span class="n">nonEmpty</span><span class="o">)</span> <span class="o">{</span> - <span class="k">val</span> <span class="n">var1</span> <span class="k">=</span> <span class="n">input</span><span class="o">.</span><span class="n">map</span><span class="o">(</span><span class="k">_</span><span class="o">.</span><span class="n">_1</span><span class="o">)</span> - <span class="k">val</span> <span class="n">mean1</span> <span class="k">=</span> <span class="n">average</span><span class="o">(</span><span class="n">var1</span><span class="o">)</span> - <span class="k">val</span> <span class="n">var2</span> <span class="k">=</span> <span class="n">input</span><span class="o">.</span><span class="n">map</span><span class="o">(</span><span class="k">_</span><span class="o">.</span><span class="n">_2</span><span class="o">)</span> - <span class="k">val</span> <span class="n">mean2</span> <span class="k">=</span> <span class="n">average</span><span class="o">(</span><span class="n">var2</span><span class="o">)</span> - - <span class="k">val</span> <span class="n">cov</span> <span class="k">=</span> <span class="n">average</span><span class="o">(</span><span class="n">var1</span><span class="o">.</span><span class="n">zip</span><span class="o">(</span><span class="n">var2</span><span class="o">).</span><span class="n">map</span><span class="o">(</span><span class="n">xy</span> <span class="k">=></span> <span class="o">(</span><span class="n">xy</span><span class="o">.</span><span class="n">_1</span> <span class="o">-</span> <span class="n">mean1</span><span class="o">)</span> <span class="o">*</span> <span class="o">(</span><span class="n">xy</span><span class="o">.</span><span class="n">_2</span> <span class="o">-</span> <span class="n">mean2</span><span class="o">)))</span> - <span class="k">val</span> <span class="n">d1</span> <span class="k">=</span> <span class="nc">Math</span><span class="o">.</span><span class="n">sqrt</span><span class="o">(</span><span class="n">average</span><span class="o">(</span><span class="n">var1</span><span class="o">.</span><span class="n">map</span><span class="o">(</span><span class="n">x</span> <span class="k">=></span> <span class="nc">Math</span><span class="o">.</span><span class="n">pow</span><span class="o">((</span><span class="n">x</span> <span class="o">-</span> <span class="n">mean1</span><span class="o">),</span> <span class="mi">2</span><span class="o">))))</span> - <span class="k">val</span> <span class="n">d2</span> <span class="k">=</span> <span class="nc">Math</span><span class="o">.</span><span class="n">sqrt</span><span class="o">(</span><span class="n">average</span><span class="o">(</span><span class="n">var2</span><span class="o">.</span><span class="n">map</span><span class="o">(</span><span class="n">x</span> <span class="k">=></span> <span class="nc">Math</span><span class="o">.</span><span class="n">pow</span><span class="o">((</span><span class="n">x</span> <span class="o">-</span> <span class="n">mean2</span><span class="o">),</span> <span class="mi">2</span><span class="o">))))</span> - - <span class="n">out</span><span class="o">.</span><span class="n">collect</span><span class="o">(</span><span class="n">cov</span> <span class="o">/</span> <span class="o">(</span><span class="n">d1</span> <span class="o">*</span> <span class="n">d2</span><span class="o">))</span> - <span class="o">}</span> -<span class="o">}</span></code></pre></div> - - </div> - - <div data-lang="java7"> - - <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="c1">//Join warnings and parsed tweets</span> -<span class="n">DataStream</span><span class="o"><</span><span class="n">Tuple2</span><span class="o"><</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">>></span> <span class="n">tweetsAndWarning</span> <span class="o">=</span> <span class="n">warningsPerStock</span> - <span class="o">.</span><span class="na">join</span><span class="o">(</span><span class="n">tweetsPerStock</span><span class="o">)</span> - <span class="o">.</span><span class="na">onWindow</span><span class="o">(</span><span class="mi">30</span><span class="o">,</span> <span class="n">TimeUnit</span><span class="o">.</span><span class="na">SECONDS</span><span class="o">)</span> - <span class="o">.</span><span class="na">where</span><span class="o">(</span><span class="s">"symbol"</span><span class="o">)</span> - <span class="o">.</span><span class="na">equalTo</span><span class="o">(</span><span class="s">"symbol"</span><span class="o">)</span> - <span class="o">.</span><span class="na">with</span><span class="o">(</span><span class="k">new</span> <span class="n">JoinFunction</span><span class="o"><</span><span class="n">Count</span><span class="o">,</span> <span class="n">Count</span><span class="o">,</span> <span class="n">Tuple2</span><span class="o"><</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">>>()</span> <span class="o">{</span> - <span class="nd">@Override</span> - <span class="kd">public</span> <span class="n">Tuple2</span><span class="o"><</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">></span> <span class="nf">join</span><span class="o">(</span><span class="n">Count</span> <span class="n">first</span><span class="o">,</span> <span class="n">Count</span> <span class="n">second</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span> - <span class="k">return</span> <span class="k">new</span> <span class="n">Tuple2</span><span class="o"><</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">>(</span><span class="n">first</span><span class="o">.</span><span class="na">count</span><span class="o">,</span> <span class="n">second</span><span class="o">.</span><span class="na">count</span><span class="o">);</span> - <span class="o">}</span> - <span class="o">});</span> - -<span class="c1">//Compute rolling correlation</span> -<span class="n">DataStream</span><span class="o"><</span><span class="n">Double</span><span class="o">></span> <span class="n">rollingCorrelation</span> <span class="o">=</span> <span class="n">tweetsAndWarning</span> - <span class="o">.</span><span class="na">window</span><span class="o">(</span><span class="n">Time</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="mi">30</span><span class="o">,</span> <span class="n">TimeUnit</span><span class="o">.</span><span class="na">SECONDS</span><span class="o">))</span> - <span class="o">.</span><span class="na">mapWindow</span><span class="o">(</span><span class="k">new</span> <span class="nf">WindowCorrelation</span><span class="o">());</span> - -<span class="n">rollingCorrelation</span><span class="o">.</span><span class="na">print</span><span class="o">();</span> - -<span class="kd">public</span> <span class="kd">static</span> <span class="kd">final</span> <span class="kd">class</span> <span class="nc">WindowCorrelation</span> - <span class="kd">implements</span> <span class="n">WindowMapFunction</span><span class="o"><</span><span class="n">Tuple2</span><span class="o"><</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">>,</span> <span class="n">Double</span><span class="o">></span> <span class="o">{</span> - - <span class="kd">private</span> <span class="n">Integer</span> <span class="n">leftSum</span><span class="o">;</span> - <span class="kd">private</span> <span class="n">Integer</span> <span class="n">rightSum</span><span class="o">;</span> - <span class="kd">private</span> <span class="n">Integer</span> <span class="n">count</span><span class="o">;</span> - - <span class="kd">private</span> <span class="n">Double</span> <span class="n">leftMean</span><span class="o">;</span> - <span class="kd">private</span> <span class="n">Double</span> <span class="n">rightMean</span><span class="o">;</span> - - <span class="kd">private</span> <span class="n">Double</span> <span class="n">cov</span><span class="o">;</span> - <span class="kd">private</span> <span class="n">Double</span> <span class="n">leftSd</span><span class="o">;</span> - <span class="kd">private</span> <span class="n">Double</span> <span class="n">rightSd</span><span class="o">;</span> - - <span class="nd">@Override</span> - <span class="kd">public</span> <span class="kt">void</span> <span class="nf">mapWindow</span><span class="o">(</span><span class="n">Iterable</span><span class="o"><</span><span class="n">Tuple2</span><span class="o"><</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">>></span> <span class="n">values</span><span class="o">,</span> <span class="n">Collector</span><span class="o"><</span><span class="n">Double</span><span class="o">></span> <span class="n">out</span><span class="o">)</span> - <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span> - - <span class="n">leftSum</span> <span class="o">=</span> <span class="mi">0</span><span class="o">;</span> - <span class="n">rightSum</span> <span class="o">=</span> <span class="mi">0</span><span class="o">;</span> - <span class="n">count</span> <span class="o">=</span> <span class="mi">0</span><span class="o">;</span> - - <span class="n">cov</span> <span class="o">=</span> <span class="mi">0</span><span class="o">.;</span> - <span class="n">leftSd</span> <span class="o">=</span> <span class="mi">0</span><span class="o">.;</span> - <span class="n">rightSd</span> <span class="o">=</span> <span class="mi">0</span><span class="o">.;</span> - - <span class="c1">//compute mean for both sides, save count</span> - <span class="k">for</span> <span class="o">(</span><span class="n">Tuple2</span><span class="o"><</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">></span> <span class="n">pair</span> <span class="o">:</span> <span class="n">values</span><span class="o">)</span> <span class="o">{</span> - <span class="n">leftSum</span> <span class="o">+=</span> <span class="n">pair</span><span class="o">.</span><span class="na">f0</span><span class="o">;</span> - <span class="n">rightSum</span> <span class="o">+=</span> <span class="n">pair</span><span class="o">.</span><span class="na">f1</span><span class="o">;</span> - <span class="n">count</span><span class="o">++;</span> - <span class="o">}</span> - - <span class="n">leftMean</span> <span class="o">=</span> <span class="n">leftSum</span><span class="o">.</span><span class="na">doubleValue</span><span class="o">()</span> <span class="o">/</span> <span class="n">count</span><span class="o">;</span> - <span class="n">rightMean</span> <span class="o">=</span> <span class="n">rightSum</span><span class="o">.</span><span class="na">doubleValue</span><span class="o">()</span> <span class="o">/</span> <span class="n">count</span><span class="o">;</span> - - <span class="c1">//compute covariance & std. deviations</span> - <span class="k">for</span> <span class="o">(</span><span class="n">Tuple2</span><span class="o"><</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">></span> <span class="n">pair</span> <span class="o">:</span> <span class="n">values</span><span class="o">)</span> <span class="o">{</span> - <span class="n">cov</span> <span class="o">+=</span> <span class="o">(</span><span class="n">pair</span><span class="o">.</span><span class="na">f0</span> <span class="o">-</span> <span class="n">leftMean</span><span class="o">)</span> <span class="o">*</span> <span class="o">(</span><span class="n">pair</span><span class="o">.</span><span class="na">f1</span> <span class="o">-</span> <span class="n">rightMean</span><span class="o">)</span> <span class="o">/</span> <span class="n">count</span><span class="o">;</span> - <span class="o">}</span> - - <span class="k">for</span> <span class="o">(</span><span class="n">Tuple2</span><span class="o"><</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">></span> <span class="n">pair</span> <span class="o">:</span> <span class="n">values</span><span class="o">)</span> <span class="o">{</span> - <span class="n">leftSd</span> <span class="o">+=</span> <span class="n">Math</span><span class="o">.</span><span class="na">pow</span><span class="o">(</span><span class="n">pair</span><span class="o">.</span><span class="na">f0</span> <span class="o">-</span> <span class="n">leftMean</span><span class="o">,</span> <span class="mi">2</span><span class="o">)</span> <span class="o">/</span> <span class="n">count</span><span class="o">;</span> - <span class="n">rightSd</span> <span class="o">+=</span> <span class="n">Math</span><span class="o">.</span><span class="na">pow</span><span class="o">(</span><span class="n">pair</span><span class="o">.</span><span class="na">f1</span> <span class="o">-</span> <span class="n">rightMean</span><span class="o">,</span> <span class="mi">2</span><span class="o">)</span> <span class="o">/</span> <span class="n">count</span><span class="o">;</span> - <span class="o">}</span> - <span class="n">leftSd</span> <span class="o">=</span> <span class="n">Math</span><span class="o">.</span><span class="na">sqrt</span><span class="o">(</span><span class="n">leftSd</span><span class="o">);</span> - <span class="n">rightSd</span> <span class="o">=</span> <span class="n">Math</span><span class="o">.</span><span class="na">sqrt</span><span class="o">(</span><span class="n">rightSd</span><span class="o">);</span> - - <span class="n">out</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="n">cov</span> <span class="o">/</span> <span class="o">(</span><span class="n">leftSd</span> <span class="o">*</span> <span class="n">rightSd</span><span class="o">));</span> - <span class="o">}</span> -<span class="o">}</span></code></pre></div> - - </div> - -</div> - -<p><a href="#top">Back to top</a></p> - -<h2 id="other-things-to-try">Other things to try</h2> - -<p>For a full feature overview please check the <a href="http://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/index.html">Streaming Guide</a>, which describes all the available API features. -You are very welcome to try out our features for different use-cases we are looking forward to your experiences. Feel free to <a href="http://flink.apache.org/community.html#mailing-lists">contact us</a>.</p> - -<h2 id="upcoming-for-streaming">Upcoming for streaming</h2> - -<p>There are some aspects of Flink Streaming that are subjects to -change by the next release making this application look even nicer.</p> - -<p>Stay tuned for later blog posts on how Flink Streaming works -internally, fault tolerance, and performance measurements!</p> - -<p><a href="#top">Back to top</a></p> - - </article> - </div> - - <div class="row"> - <div id="disqus_thread"></div> - <script type="text/javascript"> - /* * * CONFIGURATION VARIABLES: EDIT BEFORE PASTING INTO YOUR WEBPAGE * * */ - var disqus_shortname = 'stratosphere-eu'; // required: replace example with your forum shortname - - /* * * DON'T EDIT BELOW THIS LINE * * */ - (function() { - var dsq = document.createElement('script'); dsq.type = 'text/javascript'; dsq.async = true; - dsq.src = '//' + disqus_shortname + '.disqus.com/embed.js'; - (document.getElementsByTagName('head')[0] || document.getElementsByTagName('body')[0]).appendChild(dsq); - })(); -
<TRUNCATED>
