http://git-wip-us.apache.org/repos/asf/flink-web/blob/f0ac0cdb/content/docs/0.9/apis/dataset_transformations.html ---------------------------------------------------------------------- diff --git a/content/docs/0.9/apis/dataset_transformations.html b/content/docs/0.9/apis/dataset_transformations.html deleted file mode 100644 index 5247ebc..0000000 --- a/content/docs/0.9/apis/dataset_transformations.html +++ /dev/null @@ -1,1720 +0,0 @@ -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - -http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> -<!DOCTYPE html> - -<html lang="en"> - <head> - <meta charset="utf-8"> - <meta http-equiv="X-UA-Compatible" content="IE=edge"> - <meta name="viewport" content="width=device-width, initial-scale=1"> - <!-- The above 3 meta tags *must* come first in the head; any other head content must come *after* these tags --> - - <title>Apache Flink 0.9.0 Documentation: DataSet Transformations</title> - - <link rel="shortcut icon" href="http://flink.apache.org/docs/0.9/page/favicon.ico" type="image/x-icon"> - <link rel="icon" href="http://flink.apache.org/docs/0.9/page/favicon.ico" type="image/x-icon"> - - <!-- Bootstrap --> - <link rel="stylesheet" href="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.4/css/bootstrap.min.css"> - <link rel="stylesheet" href="http://flink.apache.org/docs/0.9/page/css/flink.css"> - <link rel="stylesheet" href="http://flink.apache.org/docs/0.9/page/css/syntax.css"> - <link rel="stylesheet" href="http://flink.apache.org/docs/0.9/page/css/codetabs.css"> - - <!-- HTML5 shim and Respond.js for IE8 support of HTML5 elements and media queries --> - <!-- WARNING: Respond.js doesn't work if you view the page via file:// --> - <!--[if lt IE 9]> - <script src="https://oss.maxcdn.com/html5shiv/3.7.2/html5shiv.min.js"></script> - <script src="https://oss.maxcdn.com/respond/1.4.2/respond.min.js"></script> - <![endif]--> - </head> - <body> - - - - - - - <!-- Top navbar. --> - <nav class="navbar navbar-default navbar-fixed-top"> - <div class="container"> - <!-- The logo. --> - <div class="navbar-header"> - <button type="button" class="navbar-toggle collapsed" data-toggle="collapse" data-target="#bs-example-navbar-collapse-1"> - <span class="icon-bar"></span> - <span class="icon-bar"></span> - <span class="icon-bar"></span> - </button> - <div class="navbar-logo"> - <a href="http://flink.apache.org"><img alt="Apache Flink" src="http://flink.apache.org/docs/0.9/page/img/navbar-brand-logo.jpg"></a> - </div> - </div><!-- /.navbar-header --> - - <!-- The navigation links. --> - <div class="collapse navbar-collapse" id="bs-example-navbar-collapse-1"> - <ul class="nav navbar-nav"> - <li><a href="http://flink.apache.org/docs/0.9/index.html">Overview<span class="hidden-sm hidden-xs"> 0.9.0</span></a></li> - - <!-- Setup --> - <li class="dropdown"> - <a href="http://flink.apache.org/docs/0.9/setup" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Setup <span class="caret"></span></a> - <ul class="dropdown-menu" role="menu"> - <li><a href="http://flink.apache.org/docs/0.9/setup/building.html">Get Flink 0.9-SNAPSHOT</a></li> - - <li class="divider"></li> - <li role="presentation" class="dropdown-header"><strong>Deployment</strong></li> - <li><a href="http://flink.apache.org/docs/0.9/setup/local_setup.html" class="active">Local</a></li> - <li><a href="http://flink.apache.org/docs/0.9/setup/cluster_setup.html">Cluster (Standalone)</a></li> - <li><a href="http://flink.apache.org/docs/0.9/setup/yarn_setup.html">YARN</a></li> - <li><a href="http://flink.apache.org/docs/0.9/setup/gce_setup.html">GCloud</a></li> - <li><a href="http://flink.apache.org/docs/0.9/setup/flink_on_tez.html">Flink on Tez <span class="badge">Beta</span></a></li> - - <li class="divider"></li> - <li><a href="http://flink.apache.org/docs/0.9/setup/config.html">Configuration</a></li> - </ul> - </li> - - <!-- Programming Guides --> - <li class="dropdown"> - <a href="http://flink.apache.org/docs/0.9/apis" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Programming Guides <span class="caret"></span></a> - <ul class="dropdown-menu" role="menu"> - <li><a href="http://flink.apache.org/docs/0.9/apis/programming_guide.html"><strong>Batch: DataSet API</strong></a></li> - <li><a href="http://flink.apache.org/docs/0.9/apis/streaming_guide.html"><strong>Streaming: DataStream API</strong> <span class="badge">Beta</span></a></li> - <li><a href="http://flink.apache.org/docs/0.9/apis/python.html">Python API <span class="badge">Beta</span></a></li> - - <li class="divider"></li> - <li><a href="scala_shell.html">Interactive Scala Shell</a></li> - <li><a href="http://flink.apache.org/docs/0.9/apis/dataset_transformations.html">Dataset Transformations</a></li> - <li><a href="http://flink.apache.org/docs/0.9/apis/best_practices.html">Best Practices</a></li> - <li><a href="http://flink.apache.org/docs/0.9/apis/example_connectors.html">Connectors</a></li> - <li><a href="http://flink.apache.org/docs/0.9/apis/examples.html">Examples</a></li> - <li><a href="http://flink.apache.org/docs/0.9/apis/local_execution.html">Local Execution</a></li> - <li><a href="http://flink.apache.org/docs/0.9/apis/cluster_execution.html">Cluster Execution</a></li> - <li><a href="http://flink.apache.org/docs/0.9/apis/cli.html">Command Line Interface</a></li> - <li><a href="http://flink.apache.org/docs/0.9/apis/web_client.html">Web Client</a></li> - <li><a href="http://flink.apache.org/docs/0.9/apis/iterations.html">Iterations</a></li> - <li><a href="http://flink.apache.org/docs/0.9/apis/java8.html">Java 8</a></li> - <li><a href="http://flink.apache.org/docs/0.9/apis/hadoop_compatibility.html">Hadoop Compatability <span class="badge">Beta</span></a></li> - </ul> - </li> - - <!-- Libraries --> - <li class="dropdown"> - <a href="http://flink.apache.org/docs/0.9/libs" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Libraries <span class="caret"></span></a> - <ul class="dropdown-menu" role="menu"> - <li><a href="http://flink.apache.org/docs/0.9/libs/spargel_guide.html">Graphs: Spargel</a></li> - <li><a href="http://flink.apache.org/docs/0.9/libs/gelly_guide.html">Graphs: Gelly <span class="badge">Beta</span></a></li> - <li><a href="http://flink.apache.org/docs/0.9/libs/ml/">Machine Learning <span class="badge">Beta</span></a></li> - <li><a href="http://flink.apache.org/docs/0.9/libs/table.html">Relational: Table <span class="badge">Beta</span></a></li> - </ul> - </li> - - <!-- Internals --> - <li class="dropdown"> - <a href="http://flink.apache.org/docs/0.9/internals" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Internals <span class="caret"></span></a> - <ul class="dropdown-menu" role="menu"> - <li role="presentation" class="dropdown-header"><strong>Contribute</strong></li> - <li><a href="http://flink.apache.org/docs/0.9/internals/how_to_contribute.html">How to Contribute</a></li> - <li><a href="http://flink.apache.org/docs/0.9/internals/coding_guidelines.html">Coding Guidelines</a></li> - <li><a href="http://flink.apache.org/docs/0.9/internals/ide_setup.html">IDE Setup</a></li> - <li><a href="http://flink.apache.org/docs/0.9/internals/logging.html">Logging</a></li> - <li class="divider"></li> - <li role="presentation" class="dropdown-header"><strong>Internals</strong></li> - <li><a href="http://flink.apache.org/docs/0.9/internals/general_arch.html">Architecture & Process Model</a></li> - <li><a href="http://flink.apache.org/docs/0.9/internals/types_serialization.html">Type Extraction & Serialization</a></li> - <li><a href="http://flink.apache.org/docs/0.9/internals/job_scheduling.html">Jobs & Scheduling</a></li> - <li><a href="http://flink.apache.org/docs/0.9/internals/add_operator.html">How-To: Add an Operator</a></li> - </ul> - </li> - </ul> - <form class="navbar-form navbar-right hidden-sm hidden-md" role="search" action="http://flink.apache.org/docs/0.9/search-results.html"> - <div class="form-group"> - <input type="text" class="form-control" name="q" placeholder="Search all pages"> - </div> - <button type="submit" class="btn btn-default">Search</button> - </form> - </div><!-- /.navbar-collapse --> - </div><!-- /.container --> - </nav> - - - - - <!-- Main content. --> - <div class="container"> - - -<div class="row"> - <div class="col-sm-10 col-sm-offset-1"> - <h1>DataSet Transformations</h1> - - - -<p>This document gives a deep-dive into the available transformations on DataSets. For a general introduction to the -Flink Java API, please refer to the <a href="programming_guide.html">Programming Guide</a>.</p> - -<ul id="markdown-toc"> - <li><a href="#map" id="markdown-toc-map">Map</a></li> - <li><a href="#flatmap" id="markdown-toc-flatmap">FlatMap</a></li> - <li><a href="#mappartition" id="markdown-toc-mappartition">MapPartition</a></li> - <li><a href="#filter" id="markdown-toc-filter">Filter</a></li> - <li><a href="#project-tuple-datasets-only-javapython-api-only" id="markdown-toc-project-tuple-datasets-only-javapython-api-only">Project (Tuple DataSets only) (Java/Python API Only)</a></li> - <li><a href="#transformations-on-grouped-dataset" id="markdown-toc-transformations-on-grouped-dataset">Transformations on Grouped DataSet</a></li> - <li><a href="#transformations-on-grouped-dataset-1" id="markdown-toc-transformations-on-grouped-dataset-1">Transformations on Grouped DataSet</a></li> - <li><a href="#reduce-on-grouped-dataset" id="markdown-toc-reduce-on-grouped-dataset">Reduce on Grouped DataSet</a></li> - <li><a href="#groupreduce-on-grouped-dataset" id="markdown-toc-groupreduce-on-grouped-dataset">GroupReduce on Grouped DataSet</a></li> - <li><a href="#groupcombine-on-a-grouped-dataset" id="markdown-toc-groupcombine-on-a-grouped-dataset">GroupCombine on a Grouped DataSet</a></li> - <li><a href="#aggregate-on-grouped-tuple-dataset" id="markdown-toc-aggregate-on-grouped-tuple-dataset">Aggregate on Grouped Tuple DataSet</a></li> - <li><a href="#reduce-on-full-dataset" id="markdown-toc-reduce-on-full-dataset">Reduce on full DataSet</a></li> - <li><a href="#groupreduce-on-full-dataset" id="markdown-toc-groupreduce-on-full-dataset">GroupReduce on full DataSet</a></li> - <li><a href="#groupcombine-on-a-full-dataset" id="markdown-toc-groupcombine-on-a-full-dataset">GroupCombine on a full DataSet</a></li> - <li><a href="#aggregate-on-full-tuple-dataset" id="markdown-toc-aggregate-on-full-tuple-dataset">Aggregate on full Tuple DataSet</a></li> - <li><a href="#join" id="markdown-toc-join">Join</a></li> - <li><a href="#cross" id="markdown-toc-cross">Cross</a></li> - <li><a href="#cogroup" id="markdown-toc-cogroup">CoGroup</a></li> - <li><a href="#union" id="markdown-toc-union">Union</a></li> - <li><a href="#rebalance" id="markdown-toc-rebalance">Rebalance</a></li> - <li><a href="#hash-partition" id="markdown-toc-hash-partition">Hash-Partition</a></li> - <li><a href="#sort-partition" id="markdown-toc-sort-partition">Sort Partition</a></li> - <li><a href="#first-n" id="markdown-toc-first-n">First-n</a></li> -</ul> - -<h3 id="map">Map</h3> - -<p>The Map transformation applies a user-defined map function on each element of a DataSet. -It implements a one-to-one mapping, that is, exactly one element must be returned by -the function.</p> - -<p>The following code transforms a DataSet of Integer pairs into a DataSet of Integers:</p> - -<div class="codetabs"> - <div data-lang="java"> - - <div class="highlight"><pre><code class="language-java"><span class="c1">// MapFunction that adds two integer values</span> -<span class="kd">public</span> <span class="kd">class</span> <span class="nc">IntAdder</span> <span class="kd">implements</span> <span class="n">MapFunction</span><span class="o"><</span><span class="n">Tuple2</span><span class="o"><</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">>,</span> <span class="n">Integer</span><span class="o">></span> <span class="o">{</span> - <span class="nd">@Override</span> - <span class="kd">public</span> <span class="n">Integer</span> <span class="nf">map</span><span class="o">(</span><span class="n">Tuple2</span><span class="o"><</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">></span> <span class="n">in</span><span class="o">)</span> <span class="o">{</span> - <span class="k">return</span> <span class="n">in</span><span class="o">.</span><span class="na">f0</span> <span class="o">+</span> <span class="n">in</span><span class="o">.</span><span class="na">f1</span><span class="o">;</span> - <span class="o">}</span> -<span class="o">}</span> - -<span class="c1">// [...]</span> -<span class="n">DataSet</span><span class="o"><</span><span class="n">Tuple2</span><span class="o"><</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">>></span> <span class="n">intPairs</span> <span class="o">=</span> <span class="c1">// [...]</span> -<span class="n">DataSet</span><span class="o"><</span><span class="n">Integer</span><span class="o">></span> <span class="n">intSums</span> <span class="o">=</span> <span class="n">intPairs</span><span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="k">new</span> <span class="nf">IntAdder</span><span class="o">());</span></code></pre></div> - - </div> - <div data-lang="scala"> - - <div class="highlight"><pre><code class="language-scala"><span class="k">val</span> <span class="n">intPairs</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[(</span><span class="kt">Int</span>, <span class="kt">Int</span><span class="o">)]</span> <span class="k">=</span> <span class="c1">// [...]</span> -<span class="k">val</span> <span class="n">intSums</span> <span class="k">=</span> <span class="n">intPairs</span><span class="o">.</span><span class="n">map</span> <span class="o">{</span> <span class="n">pair</span> <span class="k">=></span> <span class="n">pair</span><span class="o">.</span><span class="n">_1</span> <span class="o">+</span> <span class="n">pair</span><span class="o">.</span><span class="n">_2</span> <span class="o">}</span></code></pre></div> - - </div> - <div data-lang="python"> - - <div class="highlight"><pre><code class="language-python"> <span class="n">intSums</span> <span class="o">=</span> <span class="n">intPairs</span><span class="o">.</span><span class="n">map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">x</span><span class="p">:</span> <span class="nb">sum</span><span class="p">(</span><span class="n">x</span><span class="p">),</span> <span class="n">INT</span><span class="p">)</span></code></pre></div> - - </div> -</div> - -<h3 id="flatmap">FlatMap</h3> - -<p>The FlatMap transformation applies a user-defined flat-map function on each element of a DataSet. -This variant of a map function can return arbitrary many result elements (including none) for each input element.</p> - -<p>The following code transforms a DataSet of text lines into a DataSet of words:</p> - -<div class="codetabs"> - <div data-lang="java"> - - <div class="highlight"><pre><code class="language-java"><span class="c1">// FlatMapFunction that tokenizes a String by whitespace characters and emits all String tokens.</span> -<span class="kd">public</span> <span class="kd">class</span> <span class="nc">Tokenizer</span> <span class="kd">implements</span> <span class="n">FlatMapFunction</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">></span> <span class="o">{</span> - <span class="nd">@Override</span> - <span class="kd">public</span> <span class="kt">void</span> <span class="nf">flatMap</span><span class="o">(</span><span class="n">String</span> <span class="n">value</span><span class="o">,</span> <span class="n">Collector</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">out</span><span class="o">)</span> <span class="o">{</span> - <span class="k">for</span> <span class="o">(</span><span class="n">String</span> <span class="n">token</span> <span class="o">:</span> <span class="n">value</span><span class="o">.</span><span class="na">split</span><span class="o">(</span><span class="s">"\\W"</span><span class="o">))</span> <span class="o">{</span> - <span class="n">out</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="n">token</span><span class="o">);</span> - <span class="o">}</span> - <span class="o">}</span> -<span class="o">}</span> - -<span class="c1">// [...]</span> -<span class="n">DataSet</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">textLines</span> <span class="o">=</span> <span class="c1">// [...]</span> -<span class="n">DataSet</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">words</span> <span class="o">=</span> <span class="n">textLines</span><span class="o">.</span><span class="na">flatMap</span><span class="o">(</span><span class="k">new</span> <span class="nf">Tokenizer</span><span class="o">());</span></code></pre></div> - - </div> - <div data-lang="scala"> - - <div class="highlight"><pre><code class="language-scala"><span class="k">val</span> <span class="n">textLines</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[</span><span class="kt">String</span><span class="o">]</span> <span class="k">=</span> <span class="c1">// [...]</span> -<span class="k">val</span> <span class="n">words</span> <span class="k">=</span> <span class="n">textLines</span><span class="o">.</span><span class="n">flatMap</span> <span class="o">{</span> <span class="k">_</span><span class="o">.</span><span class="n">split</span><span class="o">(</span><span class="s">" "</span><span class="o">)</span> <span class="o">}</span></code></pre></div> - - </div> - <div data-lang="python"> - - <div class="highlight"><pre><code class="language-python"> <span class="n">words</span> <span class="o">=</span> <span class="n">lines</span><span class="o">.</span><span class="n">flat_map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">x</span><span class="p">,</span><span class="n">c</span><span class="p">:</span> <span class="p">[</span><span class="n">line</span><span class="o">.</span><span class="n">split</span><span class="p">()</span> <span class="k">for</span> <span class="n">line</span> <span class="ow">in</span> <span class="n">x</span><span class="p">],</span> <span class="n">STRING</span><span class="p">)</span></code></pre></div> - - </div> -</div> - -<h3 id="mappartition">MapPartition</h3> - -<p>MapPartition transforms a parallel partition in a single function call. The map-partition function -gets the partition as Iterable and can produce an arbitrary number of result values. The number of elements in each partition depends on the degree-of-parallelism -and previous operations.</p> - -<p>The following code transforms a DataSet of text lines into a DataSet of counts per partition:</p> - -<div class="codetabs"> - <div data-lang="java"> - - <div class="highlight"><pre><code class="language-java"><span class="kd">public</span> <span class="kd">class</span> <span class="nc">PartitionCounter</span> <span class="kd">implements</span> <span class="n">MapPartitionFunction</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">></span> <span class="o">{</span> - - <span class="kd">public</span> <span class="kt">void</span> <span class="nf">mapPartition</span><span class="o">(</span><span class="n">Iterable</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">values</span><span class="o">,</span> <span class="n">Collector</span><span class="o"><</span><span class="n">Long</span><span class="o">></span> <span class="n">out</span><span class="o">)</span> <span class="o">{</span> - <span class="kt">long</span> <span class="n">c</span> <span class="o">=</span> <span class="mi">0</span><span class="o">;</span> - <span class="k">for</span> <span class="o">(</span><span class="n">String</span> <span class="n">s</span> <span class="o">:</span> <span class="n">values</span><span class="o">)</span> <span class="o">{</span> - <span class="n">c</span><span class="o">++;</span> - <span class="o">}</span> - <span class="n">out</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="n">c</span><span class="o">);</span> - <span class="o">}</span> -<span class="o">}</span> - -<span class="c1">// [...]</span> -<span class="n">DataSet</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">textLines</span> <span class="o">=</span> <span class="c1">// [...]</span> -<span class="n">DataSet</span><span class="o"><</span><span class="n">Long</span><span class="o">></span> <span class="n">counts</span> <span class="o">=</span> <span class="n">textLines</span><span class="o">.</span><span class="na">mapPartition</span><span class="o">(</span><span class="k">new</span> <span class="nf">PartitionCounter</span><span class="o">());</span></code></pre></div> - - </div> - <div data-lang="scala"> - - <div class="highlight"><pre><code class="language-scala"><span class="k">val</span> <span class="n">textLines</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[</span><span class="kt">String</span><span class="o">]</span> <span class="k">=</span> <span class="c1">// [...]</span> -<span class="c1">// Some is required because the return value must be a Collection.</span> -<span class="c1">// There is an implicit conversion from Option to a Collection.</span> -<span class="k">val</span> <span class="n">counts</span> <span class="k">=</span> <span class="n">texLines</span><span class="o">.</span><span class="n">mapPartition</span> <span class="o">{</span> <span class="n">in</span> <span class="k">=></span> <span class="nc">Some</span><span class="o">(</span><span class="n">in</span><span class="o">.</span><span class="n">size</span><span class="o">)</span> <span class="o">}</span></code></pre></div> - - </div> - <div data-lang="python"> - - <div class="highlight"><pre><code class="language-python"> <span class="n">counts</span> <span class="o">=</span> <span class="n">lines</span><span class="o">.</span><span class="n">map_partition</span><span class="p">(</span><span class="k">lambda</span> <span class="n">x</span><span class="p">,</span><span class="n">c</span><span class="p">:</span> <span class="p">[</span><span class="nb">sum</span><span class="p">(</span><span class="mi">1</span> <span class="k">for</span> <span class="n">_</span> <span class="ow">in</span> <span class="n">x</span><span class="p">)],</span> <span class="n">INT</span><span class="p">)</span></code></pre></div> - - </div> -</div> - -<h3 id="filter">Filter</h3> - -<p>The Filter transformation applies a user-defined filter function on each element of a DataSet and retains only those elements for which the function returns <code>true</code>.</p> - -<p>The following code removes all Integers smaller than zero from a DataSet:</p> - -<div class="codetabs"> - <div data-lang="java"> - - <div class="highlight"><pre><code class="language-java"><span class="c1">// FilterFunction that filters out all Integers smaller than zero.</span> -<span class="kd">public</span> <span class="kd">class</span> <span class="nc">NaturalNumberFilter</span> <span class="kd">implements</span> <span class="n">FilterFunction</span><span class="o"><</span><span class="n">Integer</span><span class="o">></span> <span class="o">{</span> - <span class="nd">@Override</span> - <span class="kd">public</span> <span class="kt">boolean</span> <span class="nf">filter</span><span class="o">(</span><span class="n">Integer</span> <span class="n">number</span><span class="o">)</span> <span class="o">{</span> - <span class="k">return</span> <span class="n">number</span> <span class="o">>=</span> <span class="mi">0</span><span class="o">;</span> - <span class="o">}</span> -<span class="o">}</span> - -<span class="c1">// [...]</span> -<span class="n">DataSet</span><span class="o"><</span><span class="n">Integer</span><span class="o">></span> <span class="n">intNumbers</span> <span class="o">=</span> <span class="c1">// [...]</span> -<span class="n">DataSet</span><span class="o"><</span><span class="n">Integer</span><span class="o">></span> <span class="n">naturalNumbers</span> <span class="o">=</span> <span class="n">intNumbers</span><span class="o">.</span><span class="na">filter</span><span class="o">(</span><span class="k">new</span> <span class="nf">NaturalNumberFilter</span><span class="o">());</span></code></pre></div> - - </div> - <div data-lang="scala"> - - <div class="highlight"><pre><code class="language-scala"><span class="k">val</span> <span class="n">intNumbers</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[</span><span class="kt">Int</span><span class="o">]</span> <span class="k">=</span> <span class="c1">// [...]</span> -<span class="k">val</span> <span class="n">naturalNumbers</span> <span class="k">=</span> <span class="n">intNumbers</span><span class="o">.</span><span class="n">filter</span> <span class="o">{</span> <span class="k">_</span> <span class="o">></span> <span class="mi">0</span> <span class="o">}</span></code></pre></div> - - </div> - <div data-lang="python"> - - <div class="highlight"><pre><code class="language-python"> <span class="n">naturalNumbers</span> <span class="o">=</span> <span class="n">intNumbers</span><span class="o">.</span><span class="n">filter</span><span class="p">(</span><span class="k">lambda</span> <span class="n">x</span><span class="p">:</span> <span class="n">x</span> <span class="o">></span> <span class="mi">0</span><span class="p">)</span></code></pre></div> - - </div> -</div> - -<p><strong>IMPORTANT:</strong> The system assumes that the function does not modify the elements on which the predicate is applied. Violating this assumption -can lead to incorrect results.</p> - -<h3 id="project-tuple-datasets-only-javapython-api-only">Project (Tuple DataSets only) (Java/Python API Only)</h3> - -<p>The Project transformation removes or moves Tuple fields of a Tuple DataSet. -The <code>project(int...)</code> method selects Tuple fields that should be retained by their index and defines their order in the output Tuple.</p> - -<p>Projections do not require the definition of a user function.</p> - -<p>The following code shows different ways to apply a Project transformation on a DataSet:</p> - -<div class="codetabs"> - <div data-lang="java"> - - <div class="highlight"><pre><code class="language-java"><span class="n">DataSet</span><span class="o"><</span><span class="n">Tuple3</span><span class="o"><</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Double</span><span class="o">,</span> <span class="n">String</span><span class="o">>></span> <span class="n">in</span> <span class="o">=</span> <span class="c1">// [...]</span> -<span class="c1">// converts Tuple3<Integer, Double, String> into Tuple2<String, Integer></span> -<span class="n">DataSet</span><span class="o"><</span><span class="n">Tuple2</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">>></span> <span class="n">out</span> <span class="o">=</span> <span class="n">in</span><span class="o">.</span><span class="na">project</span><span class="o">(</span><span class="mi">2</span><span class="o">,</span><span class="mi">0</span><span class="o">);</span></code></pre></div> - - <h4 id="projection-with-type-hint">Projection with Type Hint</h4> - - <p>Note that the Java compiler cannot infer the return type of <code>project</code> operator. This can cause a problem if you call another operator on a result of <code>project</code> operator such as:</p> - - <div class="highlight"><pre><code class="language-java"><span class="n">DataSet</span><span class="o"><</span><span class="n">Tuple5</span><span class="o"><</span><span class="n">String</span><span class="o">,</span><span class="n">String</span><span class="o">,</span><span class="n">String</span><span class="o">,</span><span class="n">String</span><span class="o">,</span><span class="n">String</span><span class="o">>></span> <span class="n">ds</span> <span class="o">=</span> <span class="o">....</span> -<span class="n">DataSet</span><span class="o"><</span><span class="n">Tuple1</span><span class="o"><</span><span class="n">String</span><span class="o">>></span> <span class="n">ds2</span> <span class="o">=</span> <span class="n">ds</span><span class="o">.</span><span class="na">project</span><span class="o">(</span><span class="mi">0</span><span class="o">).</span><span class="na">distinct</span><span class="o">(</span><span class="mi">0</span><span class="o">);</span></code></pre></div> - - <p>This problem can be overcome by hinting the return type of <code>project</code> operator like this:</p> - - <div class="highlight"><pre><code class="language-java"><span class="n">DataSet</span><span class="o"><</span><span class="n">Tuple1</span><span class="o"><</span><span class="n">String</span><span class="o">>></span> <span class="n">ds2</span> <span class="o">=</span> <span class="n">ds</span><span class="o">.<</span><span class="n">Tuple1</span><span class="o"><</span><span class="n">String</span><span class="o">>></span><span class="n">project</span><span class="o">(</span><span class="mi">0</span><span class="o">).</span><span class="na">distinct</span><span class="o">(</span><span class="mi">0</span><span class="o">);</span></code></pre></div> - - <h3 id="transformations-on-grouped-dataset">Transformations on Grouped DataSet</h3> - - <p>The reduce operations can operate on grouped data sets. Specifying the key to -be used for grouping can be done in many ways:</p> - - <ul> - <li>key expressions</li> - <li>a key-selector function</li> - <li>one or more field position keys (Tuple DataSet only)</li> - <li>Case Class fields (Case Classes only)</li> - </ul> - - <p>Please look at the reduce examples to see how the grouping keys are specified.</p> - - </div> - <div data-lang="python"> - - <div class="highlight"><pre><code class="language-python"><span class="n">out</span> <span class="o">=</span> <span class="ow">in</span><span class="o">.</span><span class="n">project</span><span class="p">(</span><span class="mi">2</span><span class="p">,</span><span class="mi">0</span><span class="p">);</span></code></pre></div> - - <h3 id="transformations-on-grouped-dataset-1">Transformations on Grouped DataSet</h3> - - <p>The reduce operations can operate on grouped data sets. Specifying the key to -be used for grouping can be done using one or more field position keys (Tuple DataSet only).</p> - - <p>Please look at the reduce examples to see how the grouping keys are specified.</p> - - </div> -</div> - -<h3 id="reduce-on-grouped-dataset">Reduce on Grouped DataSet</h3> - -<p>A Reduce transformation that is applied on a grouped DataSet reduces each group to a single -element using a user-defined reduce function. -For each group of input elements, a reduce function successively combines pairs of elements into one -element until only a single element for each group remains.</p> - -<h4 id="reduce-on-dataset-grouped-by-keyselector-function">Reduce on DataSet Grouped by KeySelector Function</h4> - -<p>A key-selector function extracts a key value from each element of a DataSet. The extracted key -value is used to group the DataSet. -The following code shows how to group a POJO DataSet using a key-selector function and to reduce it -with a reduce function.</p> - -<div class="codetabs"> - <div data-lang="java"> - - <div class="highlight"><pre><code class="language-java"><span class="c1">// some ordinary POJO</span> -<span class="kd">public</span> <span class="kd">class</span> <span class="nc">WC</span> <span class="o">{</span> - <span class="kd">public</span> <span class="n">String</span> <span class="n">word</span><span class="o">;</span> - <span class="kd">public</span> <span class="kt">int</span> <span class="n">count</span><span class="o">;</span> - <span class="c1">// [...]</span> -<span class="o">}</span> - -<span class="c1">// ReduceFunction that sums Integer attributes of a POJO</span> -<span class="kd">public</span> <span class="kd">class</span> <span class="nc">WordCounter</span> <span class="kd">implements</span> <span class="n">ReduceFunction</span><span class="o"><</span><span class="n">WC</span><span class="o">></span> <span class="o">{</span> - <span class="nd">@Override</span> - <span class="kd">public</span> <span class="n">WC</span> <span class="nf">reduce</span><span class="o">(</span><span class="n">WC</span> <span class="n">in1</span><span class="o">,</span> <span class="n">WC</span> <span class="n">in2</span><span class="o">)</span> <span class="o">{</span> - <span class="k">return</span> <span class="k">new</span> <span class="nf">WC</span><span class="o">(</span><span class="n">in1</span><span class="o">.</span><span class="na">word</span><span class="o">,</span> <span class="n">in1</span><span class="o">.</span><span class="na">count</span> <span class="o">+</span> <span class="n">in2</span><span class="o">.</span><span class="na">count</span><span class="o">);</span> - <span class="o">}</span> -<span class="o">}</span> - -<span class="c1">// [...]</span> -<span class="n">DataSet</span><span class="o"><</span><span class="n">WC</span><span class="o">></span> <span class="n">words</span> <span class="o">=</span> <span class="c1">// [...]</span> -<span class="n">DataSet</span><span class="o"><</span><span class="n">WC</span><span class="o">></span> <span class="n">wordCounts</span> <span class="o">=</span> <span class="n">words</span> - <span class="c1">// DataSet grouping on field "word"</span> - <span class="o">.</span><span class="na">groupBy</span><span class="o">(</span><span class="s">"word"</span><span class="o">)</span> - <span class="c1">// apply ReduceFunction on grouped DataSet</span> - <span class="o">.</span><span class="na">reduce</span><span class="o">(</span><span class="k">new</span> <span class="nf">WordCounter</span><span class="o">());</span></code></pre></div> - - </div> - <div data-lang="scala"> - - <div class="highlight"><pre><code class="language-scala"><span class="c1">// some ordinary POJO</span> -<span class="k">class</span> <span class="nc">WC</span><span class="o">(</span><span class="k">val</span> <span class="n">word</span><span class="k">:</span> <span class="kt">String</span><span class="o">,</span> <span class="k">val</span> <span class="n">count</span><span class="k">:</span> <span class="kt">Int</span><span class="o">)</span> <span class="o">{</span> - <span class="k">def</span> <span class="k">this</span><span class="o">()</span> <span class="o">{</span> - <span class="k">this</span><span class="o">(</span><span class="kc">null</span><span class="o">,</span> <span class="o">-</span><span class="mi">1</span><span class="o">)</span> - <span class="o">}</span> - <span class="c1">// [...]</span> -<span class="o">}</span> - -<span class="k">val</span> <span class="n">words</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[</span><span class="kt">WC</span><span class="o">]</span> <span class="k">=</span> <span class="c1">// [...]</span> -<span class="k">val</span> <span class="n">wordCounts</span> <span class="k">=</span> <span class="n">words</span><span class="o">.</span><span class="n">groupBy</span> <span class="o">{</span> <span class="k">_</span><span class="o">.</span><span class="n">word</span> <span class="o">}</span> <span class="n">reduce</span> <span class="o">{</span> - <span class="o">(</span><span class="n">w1</span><span class="o">,</span> <span class="n">w2</span><span class="o">)</span> <span class="k">=></span> <span class="k">new</span> <span class="nc">WC</span><span class="o">(</span><span class="n">w1</span><span class="o">.</span><span class="n">word</span><span class="o">,</span> <span class="n">w1</span><span class="o">.</span><span class="n">count</span> <span class="o">+</span> <span class="n">w2</span><span class="o">.</span><span class="n">count</span><span class="o">)</span> -<span class="o">}</span></code></pre></div> - - </div> - <div data-lang="python"> - - <div class="highlight"><pre><code class="language-python"><span class="n">Not</span> <span class="n">supported</span><span class="o">.</span></code></pre></div> - </div> -</div> - -<h4 id="reduce-on-dataset-grouped-by-field-position-keys-tuple-datasets-only">Reduce on DataSet Grouped by Field Position Keys (Tuple DataSets only)</h4> - -<p>Field position keys specify one or more fields of a Tuple DataSet that are used as grouping keys. -The following code shows how to use field position keys and apply a reduce function</p> - -<div class="codetabs"> - <div data-lang="java"> - - <div class="highlight"><pre><code class="language-java"><span class="n">DataSet</span><span class="o"><</span><span class="n">Tuple3</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">,</span> <span class="n">Double</span><span class="o">>></span> <span class="n">tuples</span> <span class="o">=</span> <span class="c1">// [...]</span> -<span class="n">DataSet</span><span class="o"><</span><span class="n">Tuple3</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">,</span> <span class="n">Double</span><span class="o">>></span> <span class="n">reducedTuples</span> <span class="o">=</span> - <span class="n">tuples</span> - <span class="c1">// group DataSet on first and second field of Tuple</span> - <span class="o">.</span><span class="na">groupBy</span><span class="o">(</span><span class="mi">0</span><span class="o">,</span><span class="mi">1</span><span class="o">)</span> - <span class="c1">// apply ReduceFunction on grouped DataSet</span> - <span class="o">.</span><span class="na">reduce</span><span class="o">(</span><span class="k">new</span> <span class="nf">MyTupleReducer</span><span class="o">());</span></code></pre></div> - - </div> - <div data-lang="scala"> - - <div class="highlight"><pre><code class="language-scala"><span class="k">val</span> <span class="n">tuples</span> <span class="k">=</span> <span class="nc">DataSet</span><span class="o">[(</span><span class="kt">String</span>, <span class="kt">Int</span>, <span class="kt">Double</span><span class="o">)]</span> <span class="k">=</span> <span class="c1">// [...]</span> -<span class="c1">// group on the first and second Tuple field</span> -<span class="k">val</span> <span class="n">reducedTuples</span> <span class="k">=</span> <span class="n">tuples</span><span class="o">.</span><span class="n">groupBy</span><span class="o">(</span><span class="mi">0</span><span class="o">,</span> <span class="mi">1</span><span class="o">).</span><span class="n">reduce</span> <span class="o">{</span> <span class="o">...</span> <span class="o">}</span></code></pre></div> - - <h4 id="reduce-on-dataset-grouped-by-case-class-fields">Reduce on DataSet grouped by Case Class Fields</h4> - - <p>When using Case Classes you can also specify the grouping key using the names of the fields:</p> - - <div class="highlight"><pre><code class="language-scala"><span class="k">case</span> <span class="k">class</span> <span class="nc">MyClass</span><span class="o">(</span><span class="k">val</span> <span class="n">a</span><span class="k">:</span> <span class="kt">String</span><span class="o">,</span> <span class="n">b</span><span class="k">:</span> <span class="kt">Int</span><span class="o">,</span> <span class="n">c</span><span class="k">:</span> <span class="kt">Double</span><span class="o">)</span> -<span class="k">val</span> <span class="n">tuples</span> <span class="k">=</span> <span class="nc">DataSet</span><span class="o">[</span><span class="kt">MyClass</span><span class="o">]</span> <span class="k">=</span> <span class="c1">// [...]</span> -<span class="c1">// group on the first and second field</span> -<span class="k">val</span> <span class="n">reducedTuples</span> <span class="k">=</span> <span class="n">tuples</span><span class="o">.</span><span class="n">groupBy</span><span class="o">(</span><span class="s">"a"</span><span class="o">,</span> <span class="s">"b"</span><span class="o">).</span><span class="n">reduce</span> <span class="o">{</span> <span class="o">...</span> <span class="o">}</span></code></pre></div> - - </div> - <div data-lang="python"> - - <div class="highlight"><pre><code class="language-python"> <span class="n">reducedTuples</span> <span class="o">=</span> <span class="n">tuples</span><span class="o">.</span><span class="n">group_by</span><span class="p">(</span><span class="mi">0</span><span class="p">,</span> <span class="mi">1</span><span class="p">)</span><span class="o">.</span><span class="n">reduce</span><span class="p">(</span> <span class="o">...</span> <span class="p">)</span></code></pre></div> - - </div> -</div> - -<h3 id="groupreduce-on-grouped-dataset">GroupReduce on Grouped DataSet</h3> - -<p>A GroupReduce transformation that is applied on a grouped DataSet calls a user-defined -group-reduce function for each group. The difference -between this and <em>Reduce</em> is that the user defined function gets the whole group at once. -The function is invoked with an Iterable over all elements of a group and can return an arbitrary -number of result elements.</p> - -<h4 id="groupreduce-on-dataset-grouped-by-field-position-keys-tuple-datasets-only">GroupReduce on DataSet Grouped by Field Position Keys (Tuple DataSets only)</h4> - -<p>The following code shows how duplicate strings can be removed from a DataSet grouped by Integer.</p> - -<div class="codetabs"> - <div data-lang="java"> - - <div class="highlight"><pre><code class="language-java"><span class="kd">public</span> <span class="kd">class</span> <span class="nc">DistinctReduce</span> - <span class="kd">implements</span> <span class="n">GroupReduceFunction</span><span class="o"><</span><span class="n">Tuple2</span><span class="o"><</span><span class="n">Integer</span><span class="o">,</span> <span class="n">String</span><span class="o">>,</span> <span class="n">Tuple2</span><span class="o"><</span><span class="n">Integer</span><span class="o">,</span> <span class="n">String</span><span class="o">>></span> <span class="o">{</span> - - <span class="nd">@Override</span> - <span class="kd">public</span> <span class="kt">void</span> <span class="nf">reduce</span><span class="o">(</span><span class="n">Iterable</span><span class="o"><</span><span class="n">Tuple2</span><span class="o"><</span><span class="n">Integer</span><span class="o">,</span> <span class="n">String</span><span class="o">>></span> <span class="n">in</span><span class="o">,</span> <span class="n">Collector</span><span class="o"><</span><span class="n">Tuple2</span><span class="o"><</span><span class="n">Integer</span><span class="o">,</span> <span class="n">String</span><span class="o">>></span> <span class="n">out</span><span class="o">)</span> <span class="o">{</span> - - <span class="n">Set</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">uniqStrings</span> <span class="o">=</span> <span class="k">new</span> <span class="n">HashSet</span><span class="o"><</span><span class="n">String</span><span class="o">>();</span> - <span class="n">Integer</span> <span class="n">key</span> <span class="o">=</span> <span class="kc">null</span><span class="o">;</span> - - <span class="c1">// add all strings of the group to the set</span> - <span class="k">for</span> <span class="o">(</span><span class="n">Tuple2</span><span class="o"><</span><span class="n">Integer</span><span class="o">,</span> <span class="n">String</span><span class="o">></span> <span class="n">t</span> <span class="o">:</span> <span class="n">in</span><span class="o">)</span> <span class="o">{</span> - <span class="n">key</span> <span class="o">=</span> <span class="n">t</span><span class="o">.</span><span class="na">f0</span><span class="o">;</span> - <span class="n">uniqStrings</span><span class="o">.</span><span class="na">add</span><span class="o">(</span><span class="n">t</span><span class="o">.</span><span class="na">f1</span><span class="o">);</span> - <span class="o">}</span> - - <span class="c1">// emit all unique strings.</span> - <span class="k">for</span> <span class="o">(</span><span class="n">String</span> <span class="n">s</span> <span class="o">:</span> <span class="n">uniqStrings</span><span class="o">)</span> <span class="o">{</span> - <span class="n">out</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="k">new</span> <span class="n">Tuple2</span><span class="o"><</span><span class="n">Integer</span><span class="o">,</span> <span class="n">String</span><span class="o">>(</span><span class="n">key</span><span class="o">,</span> <span class="n">s</span><span class="o">));</span> - <span class="o">}</span> - <span class="o">}</span> -<span class="o">}</span> - -<span class="c1">// [...]</span> -<span class="n">DataSet</span><span class="o"><</span><span class="n">Tuple2</span><span class="o"><</span><span class="n">Integer</span><span class="o">,</span> <span class="n">String</span><span class="o">>></span> <span class="n">input</span> <span class="o">=</span> <span class="c1">// [...]</span> -<span class="n">DataSet</span><span class="o"><</span><span class="n">Tuple2</span><span class="o"><</span><span class="n">Integer</span><span class="o">,</span> <span class="n">String</span><span class="o">>></span> <span class="n">output</span> <span class="o">=</span> <span class="n">input</span> - <span class="o">.</span><span class="na">groupBy</span><span class="o">(</span><span class="mi">0</span><span class="o">)</span> <span class="c1">// group DataSet by the first tuple field</span> - <span class="o">.</span><span class="na">reduceGroup</span><span class="o">(</span><span class="k">new</span> <span class="nf">DistinctReduce</span><span class="o">());</span> <span class="c1">// apply GroupReduceFunction</span></code></pre></div> - - </div> - <div data-lang="scala"> - - <div class="highlight"><pre><code class="language-scala"><span class="k">val</span> <span class="n">input</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[(</span><span class="kt">Int</span>, <span class="kt">String</span><span class="o">)]</span> <span class="k">=</span> <span class="c1">// [...]</span> -<span class="k">val</span> <span class="n">output</span> <span class="k">=</span> <span class="n">input</span><span class="o">.</span><span class="n">groupBy</span><span class="o">(</span><span class="mi">0</span><span class="o">).</span><span class="n">reduceGroup</span> <span class="o">{</span> - <span class="o">(</span><span class="n">in</span><span class="o">,</span> <span class="n">out</span><span class="k">:</span> <span class="kt">Collector</span><span class="o">[(</span><span class="kt">Int</span>, <span class="kt">String</span><span class="o">)])</span> <span class="k">=></span> - <span class="n">in</span><span class="o">.</span><span class="n">toSet</span> <span class="n">foreach</span> <span class="o">(</span><span class="n">out</span><span class="o">.</span><span class="n">collect</span><span class="o">)</span> - <span class="o">}</span></code></pre></div> - - <h4 id="groupreduce-on-dataset-grouped-by-case-class-fields">GroupReduce on DataSet Grouped by Case Class Fields</h4> - - <p>Works analogous to grouping by Case Class fields in <em>Reduce</em> transformations.</p> - - </div> - <div data-lang="python"> - - <div class="highlight"><pre><code class="language-python"> <span class="k">class</span> <span class="nc">DistinctReduce</span><span class="p">(</span><span class="n">GroupReduceFunction</span><span class="p">):</span> - <span class="k">def</span> <span class="nf">reduce</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">iterator</span><span class="p">,</span> <span class="n">collector</span><span class="p">):</span> - <span class="n">dic</span> <span class="o">=</span> <span class="nb">dict</span><span class="p">()</span> - <span class="k">for</span> <span class="n">value</span> <span class="ow">in</span> <span class="n">iterator</span><span class="p">:</span> - <span class="n">dic</span><span class="p">[</span><span class="n">value</span><span class="p">[</span><span class="mi">1</span><span class="p">]]</span> <span class="o">=</span> <span class="mi">1</span> - <span class="k">for</span> <span class="n">key</span> <span class="ow">in</span> <span class="n">dic</span><span class="o">.</span><span class="n">keys</span><span class="p">():</span> - <span class="n">collector</span><span class="o">.</span><span class="n">collect</span><span class="p">(</span><span class="n">key</span><span class="p">)</span> - - <span class="n">output</span> <span class="o">=</span> <span class="n">data</span><span class="o">.</span><span class="n">group_by</span><span class="p">(</span><span class="mi">0</span><span class="p">)</span><span class="o">.</span><span class="n">reduce_group</span><span class="p">(</span><span class="n">DistinctReduce</span><span class="p">(),</span> <span class="n">STRING</span><span class="p">)</span></code></pre></div> - - </div> -</div> - -<h4 id="groupreduce-on-dataset-grouped-by-keyselector-function">GroupReduce on DataSet Grouped by KeySelector Function</h4> - -<p>Works analogous to key-selector functions in <em>Reduce</em> transformations.</p> - -<h4 id="groupreduce-on-sorted-groups">GroupReduce on sorted groups</h4> - -<p>A group-reduce function accesses the elements of a group using an Iterable. Optionally, the Iterable can hand out the elements of a group in a specified order. In many cases this can help to reduce the complexity of a user-defined -group-reduce function and improve its efficiency.</p> - -<p>The following code shows another example how to remove duplicate Strings in a DataSet grouped by an Integer and sorted by String.</p> - -<div class="codetabs"> - <div data-lang="java"> - - <div class="highlight"><pre><code class="language-java"><span class="c1">// GroupReduceFunction that removes consecutive identical elements</span> -<span class="kd">public</span> <span class="kd">class</span> <span class="nc">DistinctReduce</span> - <span class="kd">implements</span> <span class="n">GroupReduceFunction</span><span class="o"><</span><span class="n">Tuple2</span><span class="o"><</span><span class="n">Integer</span><span class="o">,</span> <span class="n">String</span><span class="o">>,</span> <span class="n">Tuple2</span><span class="o"><</span><span class="n">Integer</span><span class="o">,</span> <span class="n">String</span><span class="o">>></span> <span class="o">{</span> - - <span class="nd">@Override</span> - <span class="kd">public</span> <span class="kt">void</span> <span class="nf">reduce</span><span class="o">(</span><span class="n">Iterable</span><span class="o"><</span><span class="n">Tuple2</span><span class="o"><</span><span class="n">Integer</span><span class="o">,</span> <span class="n">String</span><span class="o">>></span> <span class="n">in</span><span class="o">,</span> <span class="n">Collector</span><span class="o"><</span><span class="n">Tuple2</span><span class="o"><</span><span class="n">Integer</span><span class="o">,</span> <span class="n">String</span><span class="o">>></span> <span class="n">out</span><span class="o">)</span> <span class="o">{</span> - <span class="n">Integer</span> <span class="n">key</span> <span class="o">=</span> <span class="kc">null</span><span class="o">;</span> - <span class="n">String</span> <span class="n">comp</span> <span class="o">=</span> <span class="kc">null</span><span class="o">;</span> - - <span class="k">for</span> <span class="o">(</span><span class="n">Tuple2</span><span class="o"><</span><span class="n">Integer</span><span class="o">,</span> <span class="n">String</span><span class="o">></span> <span class="n">t</span> <span class="o">:</span> <span class="n">in</span><span class="o">)</span> <span class="o">{</span> - <span class="n">key</span> <span class="o">=</span> <span class="n">t</span><span class="o">.</span><span class="na">f0</span><span class="o">;</span> - <span class="n">String</span> <span class="n">next</span> <span class="o">=</span> <span class="n">t</span><span class="o">.</span><span class="na">f1</span><span class="o">;</span> - - <span class="c1">// check if strings are different</span> - <span class="k">if</span> <span class="o">(</span><span class="n">com</span> <span class="o">==</span> <span class="kc">null</span> <span class="o">||</span> <span class="o">!</span><span class="n">next</span><span class="o">.</span><span class="na">equals</span><span class="o">(</span><span class="n">comp</span><span class="o">))</span> <span class="o">{</span> - <span class="n">out</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="k">new</span> <span class="n">Tuple2</span><span class="o"><</span><span class="n">Integer</span><span class="o">,</span> <span class="n">String</span><span class="o">>(</span><span class="n">key</span><span class="o">,</span> <span class="n">next</span><span class="o">));</span> - <span class="n">comp</span> <span class="o">=</span> <span class="n">next</span><span class="o">;</span> - <span class="o">}</span> - <span class="o">}</span> - <span class="o">}</span> -<span class="o">}</span> - -<span class="c1">// [...]</span> -<span class="n">DataSet</span><span class="o"><</span><span class="n">Tuple2</span><span class="o"><</span><span class="n">Integer</span><span class="o">,</span> <span class="n">String</span><span class="o">>></span> <span class="n">input</span> <span class="o">=</span> <span class="c1">// [...]</span> -<span class="n">DataSet</span><span class="o"><</span><span class="n">Double</span><span class="o">></span> <span class="n">output</span> <span class="o">=</span> <span class="n">input</span> - <span class="o">.</span><span class="na">groupBy</span><span class="o">(</span><span class="mi">0</span><span class="o">)</span> <span class="c1">// group DataSet by first field</span> - <span class="o">.</span><span class="na">sortGroup</span><span class="o">(</span><span class="mi">1</span><span class="o">,</span> <span class="n">Order</span><span class="o">.</span><span class="na">ASCENDING</span><span class="o">)</span> <span class="c1">// sort groups on second tuple field</span> - <span class="o">.</span><span class="na">reduceGroup</span><span class="o">(</span><span class="k">new</span> <span class="nf">DistinctReduce</span><span class="o">());</span></code></pre></div> - - </div> - <div data-lang="scala"> - - <div class="highlight"><pre><code class="language-scala"><span class="k">val</span> <span class="n">input</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[(</span><span class="kt">Int</span>, <span class="kt">String</span><span class="o">)]</span> <span class="k">=</span> <span class="c1">// [...]</span> -<span class="k">val</span> <span class="n">output</span> <span class="k">=</span> <span class="n">input</span><span class="o">.</span><span class="n">groupBy</span><span class="o">(</span><span class="mi">0</span><span class="o">).</span><span class="n">sortGroup</span><span class="o">(</span><span class="mi">1</span><span class="o">,</span> <span class="nc">Order</span><span class="o">.</span><span class="nc">ASCENDING</span><span class="o">).</span><span class="n">reduceGroup</span> <span class="o">{</span> - <span class="o">(</span><span class="n">in</span><span class="o">,</span> <span class="n">out</span><span class="k">:</span> <span class="kt">Collector</span><span class="o">[(</span><span class="kt">Int</span>, <span class="kt">String</span><span class="o">)])</span> <span class="k">=></span> - <span class="k">var</span> <span class="n">prev</span><span class="k">:</span> <span class="o">(</span><span class="kt">Int</span><span class="o">,</span> <span class="kt">String</span><span class="o">)</span> <span class="k">=</span> <span class="kc">null</span> - <span class="k">for</span> <span class="o">(</span><span class="n">t</span> <span class="k"><-</span> <span class="n">in</span><span class="o">)</span> <span class="o">{</span> - <span class="k">if</span> <span class="o">(</span><span class="n">prev</span> <span class="o">==</span> <span class="kc">null</span> <span class="o">||</span> <span class="n">prev</span> <span class="o">!=</span> <span class="n">t</span><span class="o">)</span> - <span class="n">out</span><span class="o">.</span><span class="n">collect</span><span class="o">(</span><span class="n">t</span><span class="o">)</span> - <span class="o">}</span> - <span class="o">}</span></code></pre></div> - - </div> - <div data-lang="python"> - - <div class="highlight"><pre><code class="language-python"> <span class="k">class</span> <span class="nc">DistinctReduce</span><span class="p">(</span><span class="n">GroupReduceFunction</span><span class="p">):</span> - <span class="k">def</span> <span class="nf">reduce</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">iterator</span><span class="p">,</span> <span class="n">collector</span><span class="p">):</span> - <span class="n">dic</span> <span class="o">=</span> <span class="nb">dict</span><span class="p">()</span> - <span class="k">for</span> <span class="n">value</span> <span class="ow">in</span> <span class="n">iterator</span><span class="p">:</span> - <span class="n">dic</span><span class="p">[</span><span class="n">value</span><span class="p">[</span><span class="mi">1</span><span class="p">]]</span> <span class="o">=</span> <span class="mi">1</span> - <span class="k">for</span> <span class="n">key</span> <span class="ow">in</span> <span class="n">dic</span><span class="o">.</span><span class="n">keys</span><span class="p">():</span> - <span class="n">collector</span><span class="o">.</span><span class="n">collect</span><span class="p">(</span><span class="n">key</span><span class="p">)</span> - - <span class="n">output</span> <span class="o">=</span> <span class="n">data</span><span class="o">.</span><span class="n">group_by</span><span class="p">(</span><span class="mi">0</span><span class="p">)</span><span class="o">.</span><span class="n">sort_group</span><span class="p">(</span><span class="mi">1</span><span class="p">,</span> <span class="n">Order</span><span class="o">.</span><span class="n">ASCENDING</span><span class="p">)</span><span class="o">.</span><span class="n">reduce_group</span><span class="p">(</span><span class="n">DistinctReduce</span><span class="p">(),</span> <span class="n">STRING</span><span class="p">)</span></code></pre></div> - - </div> -</div> - -<p><strong>Note:</strong> A GroupSort often comes for free if the grouping is established using a sort-based execution strategy of an operator before the reduce operation.</p> - -<h4 id="combinable-groupreducefunctions">Combinable GroupReduceFunctions</h4> - -<p>In contrast to a reduce function, a group-reduce function is not -necessarily combinable. In order to make a group-reduce function -combinable, you need to use the <code>RichGroupReduceFunction</code> variant, -implement (override) the <code>combine()</code> method, and annotate the -<code>RichGroupReduceFunction</code> with the <code>@Combinable</code> annotation as shown here:</p> - -<div class="codetabs"> - <div data-lang="java"> - - <div class="highlight"><pre><code class="language-java"><span class="c1">// Combinable GroupReduceFunction that computes two sums.</span> -<span class="c1">// Note that we use the RichGroupReduceFunction because it defines the combine method</span> -<span class="nd">@Combinable</span> -<span class="kd">public</span> <span class="kd">class</span> <span class="nc">MyCombinableGroupReducer</span> - <span class="kd">extends</span> <span class="n">RichGroupReduceFunction</span><span class="o"><</span><span class="n">Tuple3</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">,</span> <span class="n">Double</span><span class="o">>,</span> - <span class="n">Tuple3</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">,</span> <span class="n">Double</span><span class="o">>></span> <span class="o">{</span> - <span class="nd">@Override</span> - <span class="kd">public</span> <span class="kt">void</span> <span class="nf">reduce</span><span class="o">(</span><span class="n">Iterable</span><span class="o"><</span><span class="n">Tuple3</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">,</span> <span class="n">Double</span><span class="o">>></span> <span class="n">in</span><span class="o">,</span> - <span class="n">Collector</span><span class="o"><</span><span class="n">Tuple3</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">,</span> <span class="n">Double</span><span class="o">>></span> <span class="n">out</span><span class="o">)</span> <span class="o">{</span> - - <span class="n">String</span> <span class="n">key</span> <span class="o">=</span> <span class="kc">null</span><span class="o">;</span> - <span class="kt">int</span> <span class="n">intSum</span> <span class="o">=</span> <span class="mi">0</span><span class="o">;</span> - <span class="kt">double</span> <span class="n">doubleSum</span> <span class="o">=</span> <span class="mf">0.0</span><span class="o">;</span> - - <span class="k">for</span> <span class="o">(</span><span class="n">Tuple3</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">,</span> <span class="n">Double</span><span class="o">></span> <span class="n">curr</span> <span class="o">:</span> <span class="n">in</span><span class="o">)</span> <span class="o">{</span> - <span class="n">key</span> <span class="o">=</span> <span class="n">curr</span><span class="o">.</span><span class="na">f0</span><span class="o">;</span> - <span class="n">intSum</span> <span class="o">+=</span> <span class="n">curr</span><span class="o">.</span><span class="na">f1</span><span class="o">;</span> - <span class="n">doubleSum</span> <span class="o">+=</span> <span class="n">curr</span><span class="o">.</span><span class="na">f2</span><span class="o">;</span> - <span class="o">}</span> - <span class="c1">// emit a tuple with both sums</span> - <span class="n">out</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="k">new</span> <span class="n">Tuple3</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">,</span> <span class="n">Double</span><span class="o">>(</span><span class="n">key</span><span class="o">,</span> <span class="n">intSum</span><span class="o">,</span> <span class="n">doubleSum</span><span class="o">));</span> - <span class="o">}</span> - - <span class="nd">@Override</span> - <span class="kd">public</span> <span class="kt">void</span> <span class="nf">combine</span><span class="o">(</span><span class="n">Iterable</span><span class="o"><</span><span class="n">Tuple3</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">,</span> <span class="n">Double</span><span class="o">>></span> <span class="n">in</span><span class="o">,</span> - <span class="n">Collector</span><span class="o"><</span><span class="n">Tuple3</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">,</span> <span class="n">Double</span><span class="o">>></span> <span class="n">out</span><span class="o">)</span> <span class="o">{</span> - <span class="c1">// in some cases combine() calls can simply be forwarded to reduce().</span> - <span class="k">this</span><span class="o">.</span><span class="na">reduce</span><span class="o">(</span><span class="n">in</span><span class="o">,</span> <span class="n">out</span><span class="o">);</span> - <span class="o">}</span> -<span class="o">}</span></code></pre></div> - - </div> - <div data-lang="scala"> - - <div class="highlight"><pre><code class="language-scala"><span class="c1">// Combinable GroupReduceFunction that computes two sums.</span> -<span class="c1">// Note that we use the RichGroupReduceFunction because it defines the combine method</span> -<span class="nd">@Combinable</span> -<span class="k">class</span> <span class="nc">MyCombinableGroupReducer</span> - <span class="k">extends</span> <span class="nc">RichGroupReduceFunction</span><span class="o">[(</span><span class="kt">String</span>, <span class="kt">Int</span>, <span class="kt">Double</span><span class="o">)</span>, <span class="o">(</span><span class="kt">String</span>, <span class="kt">Int</span>, <span class="kt">Double</span><span class="o">)]</span> <span class="o">{}</span> - - <span class="k">def</span> <span class="n">reduce</span><span class="o">(</span> - <span class="n">in</span><span class="k">:</span> <span class="kt">java.lang.Iterable</span><span class="o">[(</span><span class="kt">String</span>, <span class="kt">Int</span>, <span class="kt">Double</span><span class="o">)],</span> - <span class="n">out</span><span class="k">:</span> <span class="kt">Collector</span><span class="o">[(</span><span class="kt">String</span>, <span class="kt">Int</span>, <span class="kt">Double</span><span class="o">)])</span><span class="k">:</span> <span class="kt">Unit</span> <span class="o">=</span> <span class="o">{</span> - - <span class="k">val</span> <span class="n">key</span><span class="k">:</span> <span class="kt">String</span> <span class="o">=</span> <span class="kc">null</span> - <span class="k">val</span> <span class="n">intSum</span> <span class="k">=</span> <span class="mi">0</span> - <span class="k">val</span> <span class="n">doubleSum</span> <span class="k">=</span> <span class="mf">0.0</span> - - <span class="k">for</span> <span class="o">(</span><span class="n">curr</span> <span class="k"><-</span> <span class="n">in</span><span class="o">)</span> <span class="o">{</span> - <span class="n">key</span> <span class="k">=</span> <span class="n">curr</span><span class="o">.</span><span class="n">_1</span> - <span class="n">intSum</span> <span class="o">+=</span> <span class="n">curr</span><span class="o">.</span><span class="n">_2</span> - <span class="n">doubleSum</span> <span class="o">+=</span> <span class="n">curr</span><span class="o">.</span><span class="n">_3</span> - <span class="o">}</span> - <span class="c1">// emit a tuple with both sums</span> - <span class="n">out</span><span class="o">.</span><span class="n">collect</span><span class="o">(</span><span class="n">key</span><span class="o">,</span> <span class="n">intSum</span><span class="o">,</span> <span class="n">doubleSum</span><span class="o">);</span> - <span class="o">}</span> - - <span class="k">def</span> <span class="n">combine</span><span class="o">(</span> - <span class="n">in</span><span class="k">:</span> <span class="kt">java.lang.Iterable</span><span class="o">[(</span><span class="kt">String</span>, <span class="kt">Int</span>, <span class="kt">Double</span><span class="o">)],</span> - <span class="n">out</span><span class="k">:</span> <span class="kt">Collector</span><span class="o">[(</span><span class="kt">String</span>, <span class="kt">Int</span>, <span class="kt">Double</span><span class="o">)])</span><span class="k">:</span> <span class="kt">Unit</span> <span class="o">=</span> <span class="o">{</span> - <span class="c1">// in some cases combine() calls can simply be forwarded to reduce().</span> - <span class="k">this</span><span class="o">.</span><span class="n">reduce</span><span class="o">(</span><span class="n">in</span><span class="o">,</span> <span class="n">out</span><span class="o">)</span> - <span class="o">}</span> -<span class="o">}</span></code></pre></div> - - </div> - <div data-lang="python"> - - <div class="highlight"><pre><code class="language-python"> <span class="k">class</span> <span class="nc">GroupReduce</span><span class="p">(</span><span class="n">GroupReduceFunction</span><span class="p">):</span> - <span class="k">def</span> <span class="nf">reduce</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">iterator</span><span class="p">,</span> <span class="n">collector</span><span class="p">):</span> - <span class="n">key</span><span class="p">,</span> <span class="n">int_sum</span><span class="p">,</span> <span class="n">float_sum</span> <span class="o">=</span> <span class="n">iterator</span><span class="o">.</span><span class="n">next</span><span class="p">()</span> - <span class="k">for</span> <span class="n">value</span> <span class="ow">in</span> <span class="n">iterator</span><span class="p">:</span> - <span class="n">int_sum</span> <span class="o">+=</span> <span class="n">value</span><span class="p">[</span><span class="mi">1</span><span class="p">]</span> - <span class="n">float_sum</span> <span class="o">+=</span> <span class="n">value</span><span class="p">[</span><span class="mi">2</span><span class="p">]</span> - <span class="n">collector</span><span class="o">.</span><span class="n">collect</span><span class="p">((</span><span class="n">key</span><span class="p">,</span> <span class="n">int_sum</span><span class="p">,</span> <span class="n">float_sum</span><span class="p">))</span> - <span class="c"># in some cases combine() calls can simply be forwarded to reduce().</span> - <span class="k">def</span> <span class="nf">combine</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">iterator</span><span class="p">,</span> <span class="n">collector</span><span class="p">):</span> - <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">reduce</span><span class="p">(</span><span class="n">iterator</span><span class="p">,</span> <span class="n">collector</span><span class="p">)</span> - -<span class="n">data</span><span class="o">.</span><span class="n">reduce_group</span><span class="p">(</span><span class="n">GroupReduce</span><span class="p">(),</span> <span class="p">(</span><span class="n">STRING</span><span class="p">,</span> <span class="n">INT</span><span class="p">,</span> <span class="n">FLOAT</span><span class="p">),</span> <span class="n">combinable</span><span class="o">=</span><span class="bp">True</span><span class="p">)</span></code></pre></div> - - </div> -</div> - -<h3 id="groupcombine-on-a-grouped-dataset">GroupCombine on a Grouped DataSet</h3> - -<p>The GroupCombine transformation is the generalized form of the combine step in -the Combinable GroupReduceFunction. It is generalized in the sense that it -allows combining of input type <code>I</code> to an arbitrary output type <code>O</code>. In contrast, -the combine step in the GroupReduce only allows combining from input type <code>I</code> to -output type <code>I</code>. This is because the reduce step in the GroupReduceFunction -expects input type <code>I</code>.</p> - -<p>In some applications, it is desirable to combine a DataSet into an intermediate -format before performing additional transformations (e.g. to reduce data -size). This can be achieved with a ComineGroup transformation with very little -costs.</p> - -<p><strong>Note:</strong> The GroupCombine on a Grouped DataSet is performed in memory with a - greedy strategy which may not process all data at once but in multiple - steps. It is also performed on the individual partitions without a data - exchange like in a GroupReduce transformation. This may lead to partial - results.</p> - -<p>The following example demonstrates the use of a CombineGroup transformation for -an alternative WordCount implementation. In the implementation,</p> - -<div class="codetabs"> - <div data-lang="java"> - - <div class="highlight"><pre><code class="language-java"><span class="n">DataSet</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">input</span> <span class="o">=</span> <span class="o">[..]</span> <span class="c1">// The words received as input</span> -<span class="n">DataSet</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">groupedInput</span> <span class="o">=</span> <span class="n">input</span><span class="o">.</span><span class="na">groupBy</span><span class="o">(</span><span class="mi">0</span><span class="o">);</span> <span class="c1">// group identical words</span> - -<span class="n">DataSet</span><span class="o"><</span><span class="n">Tuple2</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">>></span> <span class="n">combinedWords</span> <span class="o">=</span> <span class="n">groupedInput</span><span class="o">.</span><span class="na">combineGroup</span><span class="o">(</span><span class="k">new</span> <span class="n">GroupCombineFunction</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Tuple2</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">>()</span> <span class="o">{</span> - - <span class="kd">public</span> <span class="kt">void</span> <span class="nf">combine</span><span class="o">(</span><span class="n">Iterable</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">words</span><span class="o">,</span> <span class="n">Collector</span><span class="o"><</span><span class="n">Tuple2</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">>>)</span> <span class="o">{</span> <span class="c1">// combine</span> - <span class="kt">int</span> <span class="n">count</span> <span class="o">=</span> <span class="mi">0</span><span class="o">;</span> - <span class="k">for</span> <span class="o">(</span><span class="n">String</span> <span class="n">word</span> <span class="o">:</span> <span class="n">words</span><span class="o">)</span> <span class="o">{</span> - <span class="n">count</span><span class="o">++;</span> - <span class="o">}</span> - <span class="n">out</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="k">new</span> <span class="nf">Tuple2</span><span class="o">(</span><span class="n">word</span><span class="o">,</span> <span class="n">count</span><span class="o">));</span> - <span class="o">}</span> -<span class="o">});</span> - -<span class="n">DataSet</span><span class="o"><</span><span class="n">Tuple2</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">>></span> <span class="n">groupedCombinedWords</span> <span class="o">=</span> <span class="n">combinedWords</span><span class="o">.</span><span class="na">groupBy</span><span class="o">(</span><span class="mi">0</span><span class="o">);</span> <span class="c1">// group by words again</span> - -<span class="n">DataSet</span><span class="o"><</span><span class="n">Tuple2</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">>></span> <span class="n">output</span> <span class="o">=</span> <span class="n">combinedWords</span><span class="o">.</span><span class="na">reduceGroup</span><span class="o">(</span><span class="k">new</span> <span class="nf">GroupReduceFunction</span><span class="o">()</span> <span class="o">{</span> <span class="c1">// group reduce with full data exchange</span> - - <span class="kd">public</span> <span class="kt">void</span> <span class="nf">reduce</span><span class="o">(</span><span class="n">Iterable</span><span class="o"><</span><span class="n">Tuple2</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">>>,</span> <span class="n">Collector</span><span class="o"><</span><span class="n">Tuple2</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">>>)</span> <span class="o">{</span> - <span class="kt">int</span> <span class="n">count</span> <span class="o">=</span> <span class="mi">0</span><span class="o">;</span> - <span class="k">for</span> <span class="o">(</span><span class="n">Tuple2</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">></span> <span class="n">word</span> <span class="o">:</span> <span class="n">words</span><span class="o">)</span> <span class="o">{</span> - <span class="n">count</span><span class="o">++;</span> - <span class="o">}</span> - <span class="n">out</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="k">new</span> <span class="nf">Tuple2</span><span class="o">(</span><span class="n">word</span><span class="o">,</span> <span class="n">count</span><span class="o">));</span> - <span class="o">}</span> -<span class="o">});</span></code></pre></div> - - </div> - <div data-lang="scala"> - - <div class="highlight"><pre><code class="language-scala"><span class="k">val</span> <span class="n">input</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[</span><span class="kt">String</span><span class="o">]</span> <span class="k">=</span> <span class="o">[</span><span class="kt">..</span><span class="o">]</span> <span class="c1">// The words received as input</span> -<span class="k">val</span> <span class="n">groupedInput</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[</span><span class="kt">String</span><span class="o">]</span> <span class="k">=</span> <span class="n">input</span><span class="o">.</span><span class="n">groupBy</span><span class="o">(</span><span class="mi">0</span><span class="o">)</span> - -<span class="k">val</span> <span class="n">combinedWords</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[(</span><span class="kt">String</span>, <span class="kt">Int</span><span class="o">)]</span> <span class="k">=</span> <span class="n">groupedInput</span><span class="o">.</span><span class="n">combineGroup</span> <span class="o">{</span> - <span class="o">(</span><span class="n">words</span><span class="o">,</span> <span class="n">out</span><span class="k">:</span> <span class="kt">Collector</span><span class="o">[(</span><span class="kt">String</span>, <span class="kt">Int</span><span class="o">)])</span> <span class="k">=></span> - <span class="k">var</span> <span class="n">count</span> <span class="k">=</span> <span class="mi">0</span> - <span class="k">for</span> <span class="o">(</span><span class="n">word</span> <span class="k"><-</span> <span class="n">words</span><span class="o">)</span> <span class="o">{</span> - <span class="n">count</span><span class="o">++</span> - <span class="o">}</span> - <span class="n">out</span><span class="o">.</span><span class="n">collect</span><span class="o">(</span><span class="n">word</span><span class="o">,</span> <span class="n">count</span><span class="o">)</span> -<span class="o">}</span> - -<span class="k">val</span> <span class="n">groupedCombinedWords</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[(</span><span class="kt">String</span>, <span class="kt">Int</span><span class="o">)]</span> <span class="k">=</span> <span class="n">combinedWords</span><span class="o">.</span><span class="n">groupBy</span><span class="o">(</span><span class="mi">0</span><span class="o">)</span> - -<span class="k">val</span> <span class="n">output</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[(</span><span class="kt">String</span>, <span class="kt">Int</span><span class="o">)]</span> <span class="k">=</span> <span class="n">groupedInput</span><span class="o">.</span><span class="n">reduceGroup</span> <span class="o">{</span> - <span class="o">(</span><span class="n">words</span><span class="o">,</span> <span class="n">out</span><span class="k">:</span> <span class="kt">Collector</span><span class="o">[(</span><span class="kt">String</span>, <span class="kt">Int</span><span class="o">)])</span> <span class="k">=></span> - <span class="k">var</span> <span class="n">count</span> <span class="k">=</span> <span class="mi">0</span> - <span class="k">for</span> <span class="o">((</span><span class="n">word</span><span class="o">,</span> <span class="nc">Int</span><span class="o">)</span> <span class="k"><-</span> <span class="n">words</span><span class="o">)</span> <span class="o">{</span> - <span class="n">count</span><span class="o">++</span> - <span class="o">}</span> - <span class="n">out</span><span class="o">.</span><span class="n">collect</span><span class="o">(</span><span class="n">word</span><span class="o">,</span> <span class="n">count</span><span class="o">)</span> -<span class="o">}</span></code></pre></div> - - </div> -</div> - -<p>The above alternative WordCount implementation demonstrates how the GroupCombine -combines words before performing the GroupReduce transformation. The above -example is just a proof of concept. Note, how the combine step changes the type -of the DataSet which would normally required an additional Map transformation -before executing the GroupReduce.</p> - -<h3 id="aggregate-on-grouped-tuple-dataset">Aggregate on Grouped Tuple DataSet</h3> - -<p>There are some common aggregation operations that are frequently used. The Aggregate transformation provides the following build-in aggregation functions:</p> - -<ul> - <li>Sum,</li> - <li>Min, and</li> - <li>Max.</li> -</ul> - -<p>The Aggregate transformation can only be applied on a Tuple DataSet and supports only field positions keys for grouping.</p> - -<p>The following code shows how to apply an Aggregation transformation on a DataSet grouped by field position keys:</p> - -<div class="codetabs"> - <div data-lang="java"> - - <div class="highlight"><pre><code class="language-java"><span class="n">DataSet</span><span class="o"><</span><span class="n">Tuple3</span><span class="o"><</span><span class="n">Integer</span><span class="o">,</span> <span class="n">String</span><span class="o">,</span> <span class="n">Double</span><span class="o">>></span> <span class="n">input</span> <span class="o">=</span> <span class="c1">// [...]</span> -<span class="n">DataSet</span><span class="o"><</span><span class="n">Tuple3</span><span class="o"><</span><span class="n">Integer</span><span class="o">,</span> <span class="n">String</span><span class="o">,</span> <span class="n">Double</span><span class="o">>></span> <span class="n">output</span> <span class="o">=</span> <span class="n">input</span> - <span class="o">.</span><span class="na">groupBy</span><span class="o">(</span><span class="mi">1</span><span class="o">)</span> <span class="c1">// group DataSet on second field</span> - <span class="o">.</span><span class="na">aggregate</span><span class="o">(</span><span class="n">SUM</span><span class="o">,</span> <span class="mi">0</span><span class="o">)</span> <span class="c1">// compute sum of the first field</span> - <span class="o">.</span><span class="na">and</span><span class="o">(</span><span class="n">MIN</span><span class="o">,</span> <span class="mi">2</span><span class="o">);</span> <span class="c1">// compute minimum of the third field</span></code></pre></div> - - </div> - <div data-lang="scala"> - - <div class="highlight"><pre><code class="language-scala"><span class="k">val</span> <span class="n">input</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[(</span><span class="kt">Int</span>, <span class="kt">String</span>, <span class="kt">Double</span><span class="o">)]</span> <span class="k">=</span> <span class="c1">// [...]</span> -<span class="k">val</span> <span class="n">output</span> <span class="k">=</span> <span class="n">input</span><span class="o">.</span><span class="n">groupBy</span><span class="o">(</span><span class="mi
<TRUNCATED>