http://git-wip-us.apache.org/repos/asf/flink-web/blob/396616d4/content/docs/master/apis/dataset_transformations.html ---------------------------------------------------------------------- diff --git a/content/docs/master/apis/dataset_transformations.html b/content/docs/master/apis/dataset_transformations.html new file mode 100644 index 0000000..8fa85de --- /dev/null +++ b/content/docs/master/apis/dataset_transformations.html @@ -0,0 +1,1722 @@ +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<!DOCTYPE html> + +<html lang="en"> + <head> + <meta charset="utf-8"> + <meta http-equiv="X-UA-Compatible" content="IE=edge"> + <meta name="viewport" content="width=device-width, initial-scale=1"> + <!-- The above 3 meta tags *must* come first in the head; any other head content must come *after* these tags --> + + <title>Apache Flink 0.10-SNAPSHOT Documentation: DataSet Transformations</title> + + <link rel="shortcut icon" href="http://flink.apache.org/docs/master/page/favicon.ico" type="image/x-icon"> + <link rel="icon" href="http://flink.apache.org/docs/master/page/favicon.ico" type="image/x-icon"> + + <!-- Bootstrap --> + <link rel="stylesheet" href="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.4/css/bootstrap.min.css"> + <link rel="stylesheet" href="http://flink.apache.org/docs/master/page/css/flink.css"> + <link rel="stylesheet" href="http://flink.apache.org/docs/master/page/css/syntax.css"> + <link rel="stylesheet" href="http://flink.apache.org/docs/master/page/css/codetabs.css"> + + <!-- HTML5 shim and Respond.js for IE8 support of HTML5 elements and media queries --> + <!-- WARNING: Respond.js doesn't work if you view the page via file:// --> + <!--[if lt IE 9]> + <script src="https://oss.maxcdn.com/html5shiv/3.7.2/html5shiv.min.js"></script> + <script src="https://oss.maxcdn.com/respond/1.4.2/respond.min.js"></script> + <![endif]--> + </head> + <body> + + + + + + + <!-- Top navbar. --> + <nav class="navbar navbar-default navbar-fixed-top"> + <div class="container"> + <!-- The logo. --> + <div class="navbar-header"> + <button type="button" class="navbar-toggle collapsed" data-toggle="collapse" data-target="#bs-example-navbar-collapse-1"> + <span class="icon-bar"></span> + <span class="icon-bar"></span> + <span class="icon-bar"></span> + </button> + <div class="navbar-logo"> + <a href="http://flink.apache.org"><img alt="Apache Flink" src="http://flink.apache.org/docs/master/page/img/navbar-brand-logo.jpg"></a> + </div> + </div><!-- /.navbar-header --> + + <!-- The navigation links. --> + <div class="collapse navbar-collapse" id="bs-example-navbar-collapse-1"> + <ul class="nav navbar-nav"> + <li><a href="http://flink.apache.org/docs/master/index.html">Overview<span class="hidden-sm hidden-xs"> 0.10</span></a></li> + + <!-- Setup --> + <li class="dropdown"> + <a href="http://flink.apache.org/docs/master/setup" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Setup <span class="caret"></span></a> + <ul class="dropdown-menu" role="menu"> + <li><a href="http://flink.apache.org/docs/master/setup/building.html">Get Flink 0.10-SNAPSHOT</a></li> + + <li class="divider"></li> + <li role="presentation" class="dropdown-header"><strong>Deployment</strong></li> + <li><a href="http://flink.apache.org/docs/master/setup/local_setup.html" class="active">Local</a></li> + <li><a href="http://flink.apache.org/docs/master/setup/cluster_setup.html">Cluster (Standalone)</a></li> + <li><a href="http://flink.apache.org/docs/master/setup/yarn_setup.html">YARN</a></li> + <li><a href="http://flink.apache.org/docs/master/setup/gce_setup.html">GCloud</a></li> + <li><a href="http://flink.apache.org/docs/master/setup/flink_on_tez.html">Flink on Tez <span class="badge">Beta</span></a></li> + + <li class="divider"></li> + <li><a href="http://flink.apache.org/docs/master/setup/config.html">Configuration</a></li> + </ul> + </li> + + <!-- Programming Guides --> + <li class="dropdown"> + <a href="http://flink.apache.org/docs/master/apis" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Programming Guides <span class="caret"></span></a> + <ul class="dropdown-menu" role="menu"> + <li><a href="http://flink.apache.org/docs/master/apis/programming_guide.html"><strong>Batch: DataSet API</strong></a></li> + <li><a href="http://flink.apache.org/docs/master/apis/streaming_guide.html"><strong>Streaming: DataStream API</strong> <span class="badge">Beta</span></a></li> + <li><a href="http://flink.apache.org/docs/master/apis/python.html">Python API <span class="badge">Beta</span></a></li> + + <li class="divider"></li> + <li><a href="scala_shell.html">Interactive Scala Shell</a></li> + <li><a href="http://flink.apache.org/docs/master/apis/dataset_transformations.html">Dataset Transformations</a></li> + <li><a href="http://flink.apache.org/docs/master/apis/best_practices.html">Best Practices</a></li> + <li><a href="http://flink.apache.org/docs/master/apis/example_connectors.html">Connectors</a></li> + <li><a href="http://flink.apache.org/docs/master/apis/examples.html">Examples</a></li> + <li><a href="http://flink.apache.org/docs/master/apis/local_execution.html">Local Execution</a></li> + <li><a href="http://flink.apache.org/docs/master/apis/cluster_execution.html">Cluster Execution</a></li> + <li><a href="http://flink.apache.org/docs/master/apis/cli.html">Command Line Interface</a></li> + <li><a href="http://flink.apache.org/docs/master/apis/web_client.html">Web Client</a></li> + <li><a href="http://flink.apache.org/docs/master/apis/iterations.html">Iterations</a></li> + <li><a href="http://flink.apache.org/docs/master/apis/java8.html">Java 8</a></li> + <li><a href="http://flink.apache.org/docs/master/apis/hadoop_compatibility.html">Hadoop Compatability <span class="badge">Beta</span></a></li> + </ul> + </li> + + <!-- Libraries --> + <li class="dropdown"> + <a href="http://flink.apache.org/docs/master/libs" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Libraries <span class="caret"></span></a> + <ul class="dropdown-menu" role="menu"> + <li><a href="http://flink.apache.org/docs/master/libs/spargel_guide.html">Graphs: Spargel</a></li> + <li><a href="http://flink.apache.org/docs/master/libs/gelly_guide.html">Graphs: Gelly <span class="badge">Beta</span></a></li> + <li><a href="http://flink.apache.org/docs/master/libs/ml/">Machine Learning <span class="badge">Beta</span></a></li> + <li><a href="http://flink.apache.org/docs/master/libs/table.html">Relational: Table <span class="badge">Beta</span></a></li> + </ul> + </li> + + <!-- Internals --> + <li class="dropdown"> + <a href="http://flink.apache.org/docs/master/internals" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Internals <span class="caret"></span></a> + <ul class="dropdown-menu" role="menu"> + <li role="presentation" class="dropdown-header"><strong>Contribute</strong></li> + <li><a href="http://flink.apache.org/docs/master/internals/how_to_contribute.html">How to Contribute</a></li> + <li><a href="http://flink.apache.org/docs/master/internals/coding_guidelines.html">Coding Guidelines</a></li> + <li><a href="http://flink.apache.org/docs/master/internals/ide_setup.html">IDE Setup</a></li> + <li><a href="http://flink.apache.org/docs/master/internals/logging.html">Logging</a></li> + <li class="divider"></li> + <li role="presentation" class="dropdown-header"><strong>Internals</strong></li> + <li><a href="http://flink.apache.org/docs/master/internals/general_arch.html">Architecture & Process Model</a></li> + <li><a href="http://flink.apache.org/docs/master/internals/types_serialization.html">Type Extraction & Serialization</a></li> + <li><a href="http://flink.apache.org/docs/master/internals/job_scheduling.html">Jobs & Scheduling</a></li> + <li><a href="http://flink.apache.org/docs/master/internals/add_operator.html">How-To: Add an Operator</a></li> + </ul> + </li> + </ul> + <form class="navbar-form navbar-right hidden-sm hidden-md" role="search" action="http://flink.apache.org/docs/master/search-results.html"> + <div class="form-group"> + <input type="text" class="form-control" name="q" placeholder="Search all pages"> + </div> + <button type="submit" class="btn btn-default">Search</button> + </form> + </div><!-- /.navbar-collapse --> + </div><!-- /.container --> + </nav> + + + + + <!-- Main content. --> + <div class="container"> + + +<div class="row"> + <div class="col-sm-10 col-sm-offset-1"> + <h1>DataSet Transformations</h1> + + + +<p>This document gives a deep-dive into the available transformations on DataSets. For a general introduction to the +Flink Java API, please refer to the <a href="programming_guide.html">Programming Guide</a>.</p> + +<p>For zipping elements in a data set with a dense index, please refer to the <a href="zip_elements_guide.html">Zip Elements Guide</a>.</p> + +<ul id="markdown-toc"> + <li><a href="#map" id="markdown-toc-map">Map</a></li> + <li><a href="#flatmap" id="markdown-toc-flatmap">FlatMap</a></li> + <li><a href="#mappartition" id="markdown-toc-mappartition">MapPartition</a></li> + <li><a href="#filter" id="markdown-toc-filter">Filter</a></li> + <li><a href="#project-tuple-datasets-only-javapython-api-only" id="markdown-toc-project-tuple-datasets-only-javapython-api-only">Project (Tuple DataSets only) (Java/Python API Only)</a></li> + <li><a href="#transformations-on-grouped-dataset" id="markdown-toc-transformations-on-grouped-dataset">Transformations on Grouped DataSet</a></li> + <li><a href="#transformations-on-grouped-dataset-1" id="markdown-toc-transformations-on-grouped-dataset-1">Transformations on Grouped DataSet</a></li> + <li><a href="#reduce-on-grouped-dataset" id="markdown-toc-reduce-on-grouped-dataset">Reduce on Grouped DataSet</a></li> + <li><a href="#groupreduce-on-grouped-dataset" id="markdown-toc-groupreduce-on-grouped-dataset">GroupReduce on Grouped DataSet</a></li> + <li><a href="#groupcombine-on-a-grouped-dataset" id="markdown-toc-groupcombine-on-a-grouped-dataset">GroupCombine on a Grouped DataSet</a></li> + <li><a href="#aggregate-on-grouped-tuple-dataset" id="markdown-toc-aggregate-on-grouped-tuple-dataset">Aggregate on Grouped Tuple DataSet</a></li> + <li><a href="#reduce-on-full-dataset" id="markdown-toc-reduce-on-full-dataset">Reduce on full DataSet</a></li> + <li><a href="#groupreduce-on-full-dataset" id="markdown-toc-groupreduce-on-full-dataset">GroupReduce on full DataSet</a></li> + <li><a href="#groupcombine-on-a-full-dataset" id="markdown-toc-groupcombine-on-a-full-dataset">GroupCombine on a full DataSet</a></li> + <li><a href="#aggregate-on-full-tuple-dataset" id="markdown-toc-aggregate-on-full-tuple-dataset">Aggregate on full Tuple DataSet</a></li> + <li><a href="#join" id="markdown-toc-join">Join</a></li> + <li><a href="#cross" id="markdown-toc-cross">Cross</a></li> + <li><a href="#cogroup" id="markdown-toc-cogroup">CoGroup</a></li> + <li><a href="#union" id="markdown-toc-union">Union</a></li> + <li><a href="#rebalance" id="markdown-toc-rebalance">Rebalance</a></li> + <li><a href="#hash-partition" id="markdown-toc-hash-partition">Hash-Partition</a></li> + <li><a href="#sort-partition" id="markdown-toc-sort-partition">Sort Partition</a></li> + <li><a href="#first-n" id="markdown-toc-first-n">First-n</a></li> +</ul> + +<h3 id="map">Map</h3> + +<p>The Map transformation applies a user-defined map function on each element of a DataSet. +It implements a one-to-one mapping, that is, exactly one element must be returned by +the function.</p> + +<p>The following code transforms a DataSet of Integer pairs into a DataSet of Integers:</p> + +<div class="codetabs"> + <div data-lang="java"> + + <div class="highlight"><pre><code class="language-java"><span class="c1">// MapFunction that adds two integer values</span> +<span class="kd">public</span> <span class="kd">class</span> <span class="nc">IntAdder</span> <span class="kd">implements</span> <span class="n">MapFunction</span><span class="o"><</span><span class="n">Tuple2</span><span class="o"><</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">>,</span> <span class="n">Integer</span><span class="o">></span> <span class="o">{</span> + <span class="nd">@Override</span> + <span class="kd">public</span> <span class="n">Integer</span> <span class="nf">map</span><span class="o">(</span><span class="n">Tuple2</span><span class="o"><</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">></span> <span class="n">in</span><span class="o">)</span> <span class="o">{</span> + <span class="k">return</span> <span class="n">in</span><span class="o">.</span><span class="na">f0</span> <span class="o">+</span> <span class="n">in</span><span class="o">.</span><span class="na">f1</span><span class="o">;</span> + <span class="o">}</span> +<span class="o">}</span> + +<span class="c1">// [...]</span> +<span class="n">DataSet</span><span class="o"><</span><span class="n">Tuple2</span><span class="o"><</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">>></span> <span class="n">intPairs</span> <span class="o">=</span> <span class="c1">// [...]</span> +<span class="n">DataSet</span><span class="o"><</span><span class="n">Integer</span><span class="o">></span> <span class="n">intSums</span> <span class="o">=</span> <span class="n">intPairs</span><span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="k">new</span> <span class="nf">IntAdder</span><span class="o">());</span></code></pre></div> + + </div> + <div data-lang="scala"> + + <div class="highlight"><pre><code class="language-scala"><span class="k">val</span> <span class="n">intPairs</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[(</span><span class="kt">Int</span>, <span class="kt">Int</span><span class="o">)]</span> <span class="k">=</span> <span class="c1">// [...]</span> +<span class="k">val</span> <span class="n">intSums</span> <span class="k">=</span> <span class="n">intPairs</span><span class="o">.</span><span class="n">map</span> <span class="o">{</span> <span class="n">pair</span> <span class="k">=></span> <span class="n">pair</span><span class="o">.</span><span class="n">_1</span> <span class="o">+</span> <span class="n">pair</span><span class="o">.</span><span class="n">_2</span> <span class="o">}</span></code></pre></div> + + </div> + <div data-lang="python"> + + <div class="highlight"><pre><code class="language-python"> <span class="n">intSums</span> <span class="o">=</span> <span class="n">intPairs</span><span class="o">.</span><span class="n">map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">x</span><span class="p">:</span> <span class="nb">sum</span><span class="p">(</span><span class="n">x</span><span class="p">),</span> <span class="n">INT</span><span class="p">)</span></code></pre></div> + + </div> +</div> + +<h3 id="flatmap">FlatMap</h3> + +<p>The FlatMap transformation applies a user-defined flat-map function on each element of a DataSet. +This variant of a map function can return arbitrary many result elements (including none) for each input element.</p> + +<p>The following code transforms a DataSet of text lines into a DataSet of words:</p> + +<div class="codetabs"> + <div data-lang="java"> + + <div class="highlight"><pre><code class="language-java"><span class="c1">// FlatMapFunction that tokenizes a String by whitespace characters and emits all String tokens.</span> +<span class="kd">public</span> <span class="kd">class</span> <span class="nc">Tokenizer</span> <span class="kd">implements</span> <span class="n">FlatMapFunction</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">></span> <span class="o">{</span> + <span class="nd">@Override</span> + <span class="kd">public</span> <span class="kt">void</span> <span class="nf">flatMap</span><span class="o">(</span><span class="n">String</span> <span class="n">value</span><span class="o">,</span> <span class="n">Collector</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">out</span><span class="o">)</span> <span class="o">{</span> + <span class="k">for</span> <span class="o">(</span><span class="n">String</span> <span class="n">token</span> <span class="o">:</span> <span class="n">value</span><span class="o">.</span><span class="na">split</span><span class="o">(</span><span class="s">"\\W"</span><span class="o">))</span> <span class="o">{</span> + <span class="n">out</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="n">token</span><span class="o">);</span> + <span class="o">}</span> + <span class="o">}</span> +<span class="o">}</span> + +<span class="c1">// [...]</span> +<span class="n">DataSet</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">textLines</span> <span class="o">=</span> <span class="c1">// [...]</span> +<span class="n">DataSet</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">words</span> <span class="o">=</span> <span class="n">textLines</span><span class="o">.</span><span class="na">flatMap</span><span class="o">(</span><span class="k">new</span> <span class="nf">Tokenizer</span><span class="o">());</span></code></pre></div> + + </div> + <div data-lang="scala"> + + <div class="highlight"><pre><code class="language-scala"><span class="k">val</span> <span class="n">textLines</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[</span><span class="kt">String</span><span class="o">]</span> <span class="k">=</span> <span class="c1">// [...]</span> +<span class="k">val</span> <span class="n">words</span> <span class="k">=</span> <span class="n">textLines</span><span class="o">.</span><span class="n">flatMap</span> <span class="o">{</span> <span class="k">_</span><span class="o">.</span><span class="n">split</span><span class="o">(</span><span class="s">" "</span><span class="o">)</span> <span class="o">}</span></code></pre></div> + + </div> + <div data-lang="python"> + + <div class="highlight"><pre><code class="language-python"> <span class="n">words</span> <span class="o">=</span> <span class="n">lines</span><span class="o">.</span><span class="n">flat_map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">x</span><span class="p">,</span><span class="n">c</span><span class="p">:</span> <span class="p">[</span><span class="n">line</span><span class="o">.</span><span class="n">split</span><span class="p">()</span> <span class="k">for</span> <span class="n">line</span> <span class="ow">in</span> <span class="n">x</span><span class="p">],</span> <span class="n">STRING</span><span class="p">)</span></code></pre></div> + + </div> +</div> + +<h3 id="mappartition">MapPartition</h3> + +<p>MapPartition transforms a parallel partition in a single function call. The map-partition function +gets the partition as Iterable and can produce an arbitrary number of result values. The number of elements in each partition depends on the degree-of-parallelism +and previous operations.</p> + +<p>The following code transforms a DataSet of text lines into a DataSet of counts per partition:</p> + +<div class="codetabs"> + <div data-lang="java"> + + <div class="highlight"><pre><code class="language-java"><span class="kd">public</span> <span class="kd">class</span> <span class="nc">PartitionCounter</span> <span class="kd">implements</span> <span class="n">MapPartitionFunction</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">></span> <span class="o">{</span> + + <span class="kd">public</span> <span class="kt">void</span> <span class="nf">mapPartition</span><span class="o">(</span><span class="n">Iterable</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">values</span><span class="o">,</span> <span class="n">Collector</span><span class="o"><</span><span class="n">Long</span><span class="o">></span> <span class="n">out</span><span class="o">)</span> <span class="o">{</span> + <span class="kt">long</span> <span class="n">c</span> <span class="o">=</span> <span class="mi">0</span><span class="o">;</span> + <span class="k">for</span> <span class="o">(</span><span class="n">String</span> <span class="n">s</span> <span class="o">:</span> <span class="n">values</span><span class="o">)</span> <span class="o">{</span> + <span class="n">c</span><span class="o">++;</span> + <span class="o">}</span> + <span class="n">out</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="n">c</span><span class="o">);</span> + <span class="o">}</span> +<span class="o">}</span> + +<span class="c1">// [...]</span> +<span class="n">DataSet</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">textLines</span> <span class="o">=</span> <span class="c1">// [...]</span> +<span class="n">DataSet</span><span class="o"><</span><span class="n">Long</span><span class="o">></span> <span class="n">counts</span> <span class="o">=</span> <span class="n">textLines</span><span class="o">.</span><span class="na">mapPartition</span><span class="o">(</span><span class="k">new</span> <span class="nf">PartitionCounter</span><span class="o">());</span></code></pre></div> + + </div> + <div data-lang="scala"> + + <div class="highlight"><pre><code class="language-scala"><span class="k">val</span> <span class="n">textLines</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[</span><span class="kt">String</span><span class="o">]</span> <span class="k">=</span> <span class="c1">// [...]</span> +<span class="c1">// Some is required because the return value must be a Collection.</span> +<span class="c1">// There is an implicit conversion from Option to a Collection.</span> +<span class="k">val</span> <span class="n">counts</span> <span class="k">=</span> <span class="n">texLines</span><span class="o">.</span><span class="n">mapPartition</span> <span class="o">{</span> <span class="n">in</span> <span class="k">=></span> <span class="nc">Some</span><span class="o">(</span><span class="n">in</span><span class="o">.</span><span class="n">size</span><span class="o">)</span> <span class="o">}</span></code></pre></div> + + </div> + <div data-lang="python"> + + <div class="highlight"><pre><code class="language-python"> <span class="n">counts</span> <span class="o">=</span> <span class="n">lines</span><span class="o">.</span><span class="n">map_partition</span><span class="p">(</span><span class="k">lambda</span> <span class="n">x</span><span class="p">,</span><span class="n">c</span><span class="p">:</span> <span class="p">[</span><span class="nb">sum</span><span class="p">(</span><span class="mi">1</span> <span class="k">for</span> <span class="n">_</span> <span class="ow">in</span> <span class="n">x</span><span class="p">)],</span> <span class="n">INT</span><span class="p">)</span></code></pre></div> + + </div> +</div> + +<h3 id="filter">Filter</h3> + +<p>The Filter transformation applies a user-defined filter function on each element of a DataSet and retains only those elements for which the function returns <code>true</code>.</p> + +<p>The following code removes all Integers smaller than zero from a DataSet:</p> + +<div class="codetabs"> + <div data-lang="java"> + + <div class="highlight"><pre><code class="language-java"><span class="c1">// FilterFunction that filters out all Integers smaller than zero.</span> +<span class="kd">public</span> <span class="kd">class</span> <span class="nc">NaturalNumberFilter</span> <span class="kd">implements</span> <span class="n">FilterFunction</span><span class="o"><</span><span class="n">Integer</span><span class="o">></span> <span class="o">{</span> + <span class="nd">@Override</span> + <span class="kd">public</span> <span class="kt">boolean</span> <span class="nf">filter</span><span class="o">(</span><span class="n">Integer</span> <span class="n">number</span><span class="o">)</span> <span class="o">{</span> + <span class="k">return</span> <span class="n">number</span> <span class="o">>=</span> <span class="mi">0</span><span class="o">;</span> + <span class="o">}</span> +<span class="o">}</span> + +<span class="c1">// [...]</span> +<span class="n">DataSet</span><span class="o"><</span><span class="n">Integer</span><span class="o">></span> <span class="n">intNumbers</span> <span class="o">=</span> <span class="c1">// [...]</span> +<span class="n">DataSet</span><span class="o"><</span><span class="n">Integer</span><span class="o">></span> <span class="n">naturalNumbers</span> <span class="o">=</span> <span class="n">intNumbers</span><span class="o">.</span><span class="na">filter</span><span class="o">(</span><span class="k">new</span> <span class="nf">NaturalNumberFilter</span><span class="o">());</span></code></pre></div> + + </div> + <div data-lang="scala"> + + <div class="highlight"><pre><code class="language-scala"><span class="k">val</span> <span class="n">intNumbers</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[</span><span class="kt">Int</span><span class="o">]</span> <span class="k">=</span> <span class="c1">// [...]</span> +<span class="k">val</span> <span class="n">naturalNumbers</span> <span class="k">=</span> <span class="n">intNumbers</span><span class="o">.</span><span class="n">filter</span> <span class="o">{</span> <span class="k">_</span> <span class="o">></span> <span class="mi">0</span> <span class="o">}</span></code></pre></div> + + </div> + <div data-lang="python"> + + <div class="highlight"><pre><code class="language-python"> <span class="n">naturalNumbers</span> <span class="o">=</span> <span class="n">intNumbers</span><span class="o">.</span><span class="n">filter</span><span class="p">(</span><span class="k">lambda</span> <span class="n">x</span><span class="p">:</span> <span class="n">x</span> <span class="o">></span> <span class="mi">0</span><span class="p">)</span></code></pre></div> + + </div> +</div> + +<p><strong>IMPORTANT:</strong> The system assumes that the function does not modify the elements on which the predicate is applied. Violating this assumption +can lead to incorrect results.</p> + +<h3 id="project-tuple-datasets-only-javapython-api-only">Project (Tuple DataSets only) (Java/Python API Only)</h3> + +<p>The Project transformation removes or moves Tuple fields of a Tuple DataSet. +The <code>project(int...)</code> method selects Tuple fields that should be retained by their index and defines their order in the output Tuple.</p> + +<p>Projections do not require the definition of a user function.</p> + +<p>The following code shows different ways to apply a Project transformation on a DataSet:</p> + +<div class="codetabs"> + <div data-lang="java"> + + <div class="highlight"><pre><code class="language-java"><span class="n">DataSet</span><span class="o"><</span><span class="n">Tuple3</span><span class="o"><</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Double</span><span class="o">,</span> <span class="n">String</span><span class="o">>></span> <span class="n">in</span> <span class="o">=</span> <span class="c1">// [...]</span> +<span class="c1">// converts Tuple3<Integer, Double, String> into Tuple2<String, Integer></span> +<span class="n">DataSet</span><span class="o"><</span><span class="n">Tuple2</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">>></span> <span class="n">out</span> <span class="o">=</span> <span class="n">in</span><span class="o">.</span><span class="na">project</span><span class="o">(</span><span class="mi">2</span><span class="o">,</span><span class="mi">0</span><span class="o">);</span></code></pre></div> + + <h4 id="projection-with-type-hint">Projection with Type Hint</h4> + + <p>Note that the Java compiler cannot infer the return type of <code>project</code> operator. This can cause a problem if you call another operator on a result of <code>project</code> operator such as:</p> + + <div class="highlight"><pre><code class="language-java"><span class="n">DataSet</span><span class="o"><</span><span class="n">Tuple5</span><span class="o"><</span><span class="n">String</span><span class="o">,</span><span class="n">String</span><span class="o">,</span><span class="n">String</span><span class="o">,</span><span class="n">String</span><span class="o">,</span><span class="n">String</span><span class="o">>></span> <span class="n">ds</span> <span class="o">=</span> <span class="o">....</span> +<span class="n">DataSet</span><span class="o"><</span><span class="n">Tuple1</span><span class="o"><</span><span class="n">String</span><span class="o">>></span> <span class="n">ds2</span> <span class="o">=</span> <span class="n">ds</span><span class="o">.</span><span class="na">project</span><span class="o">(</span><span class="mi">0</span><span class="o">).</span><span class="na">distinct</span><span class="o">(</span><span class="mi">0</span><span class="o">);</span></code></pre></div> + + <p>This problem can be overcome by hinting the return type of <code>project</code> operator like this:</p> + + <div class="highlight"><pre><code class="language-java"><span class="n">DataSet</span><span class="o"><</span><span class="n">Tuple1</span><span class="o"><</span><span class="n">String</span><span class="o">>></span> <span class="n">ds2</span> <span class="o">=</span> <span class="n">ds</span><span class="o">.<</span><span class="n">Tuple1</span><span class="o"><</span><span class="n">String</span><span class="o">>></span><span class="n">project</span><span class="o">(</span><span class="mi">0</span><span class="o">).</span><span class="na">distinct</span><span class="o">(</span><span class="mi">0</span><span class="o">);</span></code></pre></div> + + <h3 id="transformations-on-grouped-dataset">Transformations on Grouped DataSet</h3> + + <p>The reduce operations can operate on grouped data sets. Specifying the key to +be used for grouping can be done in many ways:</p> + + <ul> + <li>key expressions</li> + <li>a key-selector function</li> + <li>one or more field position keys (Tuple DataSet only)</li> + <li>Case Class fields (Case Classes only)</li> + </ul> + + <p>Please look at the reduce examples to see how the grouping keys are specified.</p> + + </div> + <div data-lang="python"> + + <div class="highlight"><pre><code class="language-python"><span class="n">out</span> <span class="o">=</span> <span class="ow">in</span><span class="o">.</span><span class="n">project</span><span class="p">(</span><span class="mi">2</span><span class="p">,</span><span class="mi">0</span><span class="p">);</span></code></pre></div> + + <h3 id="transformations-on-grouped-dataset-1">Transformations on Grouped DataSet</h3> + + <p>The reduce operations can operate on grouped data sets. Specifying the key to +be used for grouping can be done using one or more field position keys (Tuple DataSet only).</p> + + <p>Please look at the reduce examples to see how the grouping keys are specified.</p> + + </div> +</div> + +<h3 id="reduce-on-grouped-dataset">Reduce on Grouped DataSet</h3> + +<p>A Reduce transformation that is applied on a grouped DataSet reduces each group to a single +element using a user-defined reduce function. +For each group of input elements, a reduce function successively combines pairs of elements into one +element until only a single element for each group remains.</p> + +<h4 id="reduce-on-dataset-grouped-by-keyselector-function">Reduce on DataSet Grouped by KeySelector Function</h4> + +<p>A key-selector function extracts a key value from each element of a DataSet. The extracted key +value is used to group the DataSet. +The following code shows how to group a POJO DataSet using a key-selector function and to reduce it +with a reduce function.</p> + +<div class="codetabs"> + <div data-lang="java"> + + <div class="highlight"><pre><code class="language-java"><span class="c1">// some ordinary POJO</span> +<span class="kd">public</span> <span class="kd">class</span> <span class="nc">WC</span> <span class="o">{</span> + <span class="kd">public</span> <span class="n">String</span> <span class="n">word</span><span class="o">;</span> + <span class="kd">public</span> <span class="kt">int</span> <span class="n">count</span><span class="o">;</span> + <span class="c1">// [...]</span> +<span class="o">}</span> + +<span class="c1">// ReduceFunction that sums Integer attributes of a POJO</span> +<span class="kd">public</span> <span class="kd">class</span> <span class="nc">WordCounter</span> <span class="kd">implements</span> <span class="n">ReduceFunction</span><span class="o"><</span><span class="n">WC</span><span class="o">></span> <span class="o">{</span> + <span class="nd">@Override</span> + <span class="kd">public</span> <span class="n">WC</span> <span class="nf">reduce</span><span class="o">(</span><span class="n">WC</span> <span class="n">in1</span><span class="o">,</span> <span class="n">WC</span> <span class="n">in2</span><span class="o">)</span> <span class="o">{</span> + <span class="k">return</span> <span class="k">new</span> <span class="nf">WC</span><span class="o">(</span><span class="n">in1</span><span class="o">.</span><span class="na">word</span><span class="o">,</span> <span class="n">in1</span><span class="o">.</span><span class="na">count</span> <span class="o">+</span> <span class="n">in2</span><span class="o">.</span><span class="na">count</span><span class="o">);</span> + <span class="o">}</span> +<span class="o">}</span> + +<span class="c1">// [...]</span> +<span class="n">DataSet</span><span class="o"><</span><span class="n">WC</span><span class="o">></span> <span class="n">words</span> <span class="o">=</span> <span class="c1">// [...]</span> +<span class="n">DataSet</span><span class="o"><</span><span class="n">WC</span><span class="o">></span> <span class="n">wordCounts</span> <span class="o">=</span> <span class="n">words</span> + <span class="c1">// DataSet grouping on field "word"</span> + <span class="o">.</span><span class="na">groupBy</span><span class="o">(</span><span class="s">"word"</span><span class="o">)</span> + <span class="c1">// apply ReduceFunction on grouped DataSet</span> + <span class="o">.</span><span class="na">reduce</span><span class="o">(</span><span class="k">new</span> <span class="nf">WordCounter</span><span class="o">());</span></code></pre></div> + + </div> + <div data-lang="scala"> + + <div class="highlight"><pre><code class="language-scala"><span class="c1">// some ordinary POJO</span> +<span class="k">class</span> <span class="nc">WC</span><span class="o">(</span><span class="k">val</span> <span class="n">word</span><span class="k">:</span> <span class="kt">String</span><span class="o">,</span> <span class="k">val</span> <span class="n">count</span><span class="k">:</span> <span class="kt">Int</span><span class="o">)</span> <span class="o">{</span> + <span class="k">def</span> <span class="k">this</span><span class="o">()</span> <span class="o">{</span> + <span class="k">this</span><span class="o">(</span><span class="kc">null</span><span class="o">,</span> <span class="o">-</span><span class="mi">1</span><span class="o">)</span> + <span class="o">}</span> + <span class="c1">// [...]</span> +<span class="o">}</span> + +<span class="k">val</span> <span class="n">words</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[</span><span class="kt">WC</span><span class="o">]</span> <span class="k">=</span> <span class="c1">// [...]</span> +<span class="k">val</span> <span class="n">wordCounts</span> <span class="k">=</span> <span class="n">words</span><span class="o">.</span><span class="n">groupBy</span> <span class="o">{</span> <span class="k">_</span><span class="o">.</span><span class="n">word</span> <span class="o">}</span> <span class="n">reduce</span> <span class="o">{</span> + <span class="o">(</span><span class="n">w1</span><span class="o">,</span> <span class="n">w2</span><span class="o">)</span> <span class="k">=></span> <span class="k">new</span> <span class="nc">WC</span><span class="o">(</span><span class="n">w1</span><span class="o">.</span><span class="n">word</span><span class="o">,</span> <span class="n">w1</span><span class="o">.</span><span class="n">count</span> <span class="o">+</span> <span class="n">w2</span><span class="o">.</span><span class="n">count</span><span class="o">)</span> +<span class="o">}</span></code></pre></div> + + </div> + <div data-lang="python"> + + <div class="highlight"><pre><code class="language-python"><span class="n">Not</span> <span class="n">supported</span><span class="o">.</span></code></pre></div> + </div> +</div> + +<h4 id="reduce-on-dataset-grouped-by-field-position-keys-tuple-datasets-only">Reduce on DataSet Grouped by Field Position Keys (Tuple DataSets only)</h4> + +<p>Field position keys specify one or more fields of a Tuple DataSet that are used as grouping keys. +The following code shows how to use field position keys and apply a reduce function</p> + +<div class="codetabs"> + <div data-lang="java"> + + <div class="highlight"><pre><code class="language-java"><span class="n">DataSet</span><span class="o"><</span><span class="n">Tuple3</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">,</span> <span class="n">Double</span><span class="o">>></span> <span class="n">tuples</span> <span class="o">=</span> <span class="c1">// [...]</span> +<span class="n">DataSet</span><span class="o"><</span><span class="n">Tuple3</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">,</span> <span class="n">Double</span><span class="o">>></span> <span class="n">reducedTuples</span> <span class="o">=</span> + <span class="n">tuples</span> + <span class="c1">// group DataSet on first and second field of Tuple</span> + <span class="o">.</span><span class="na">groupBy</span><span class="o">(</span><span class="mi">0</span><span class="o">,</span><span class="mi">1</span><span class="o">)</span> + <span class="c1">// apply ReduceFunction on grouped DataSet</span> + <span class="o">.</span><span class="na">reduce</span><span class="o">(</span><span class="k">new</span> <span class="nf">MyTupleReducer</span><span class="o">());</span></code></pre></div> + + </div> + <div data-lang="scala"> + + <div class="highlight"><pre><code class="language-scala"><span class="k">val</span> <span class="n">tuples</span> <span class="k">=</span> <span class="nc">DataSet</span><span class="o">[(</span><span class="kt">String</span>, <span class="kt">Int</span>, <span class="kt">Double</span><span class="o">)]</span> <span class="k">=</span> <span class="c1">// [...]</span> +<span class="c1">// group on the first and second Tuple field</span> +<span class="k">val</span> <span class="n">reducedTuples</span> <span class="k">=</span> <span class="n">tuples</span><span class="o">.</span><span class="n">groupBy</span><span class="o">(</span><span class="mi">0</span><span class="o">,</span> <span class="mi">1</span><span class="o">).</span><span class="n">reduce</span> <span class="o">{</span> <span class="o">...</span> <span class="o">}</span></code></pre></div> + + <h4 id="reduce-on-dataset-grouped-by-case-class-fields">Reduce on DataSet grouped by Case Class Fields</h4> + + <p>When using Case Classes you can also specify the grouping key using the names of the fields:</p> + + <div class="highlight"><pre><code class="language-scala"><span class="k">case</span> <span class="k">class</span> <span class="nc">MyClass</span><span class="o">(</span><span class="k">val</span> <span class="n">a</span><span class="k">:</span> <span class="kt">String</span><span class="o">,</span> <span class="n">b</span><span class="k">:</span> <span class="kt">Int</span><span class="o">,</span> <span class="n">c</span><span class="k">:</span> <span class="kt">Double</span><span class="o">)</span> +<span class="k">val</span> <span class="n">tuples</span> <span class="k">=</span> <span class="nc">DataSet</span><span class="o">[</span><span class="kt">MyClass</span><span class="o">]</span> <span class="k">=</span> <span class="c1">// [...]</span> +<span class="c1">// group on the first and second field</span> +<span class="k">val</span> <span class="n">reducedTuples</span> <span class="k">=</span> <span class="n">tuples</span><span class="o">.</span><span class="n">groupBy</span><span class="o">(</span><span class="s">"a"</span><span class="o">,</span> <span class="s">"b"</span><span class="o">).</span><span class="n">reduce</span> <span class="o">{</span> <span class="o">...</span> <span class="o">}</span></code></pre></div> + + </div> + <div data-lang="python"> + + <div class="highlight"><pre><code class="language-python"> <span class="n">reducedTuples</span> <span class="o">=</span> <span class="n">tuples</span><span class="o">.</span><span class="n">group_by</span><span class="p">(</span><span class="mi">0</span><span class="p">,</span> <span class="mi">1</span><span class="p">)</span><span class="o">.</span><span class="n">reduce</span><span class="p">(</span> <span class="o">...</span> <span class="p">)</span></code></pre></div> + + </div> +</div> + +<h3 id="groupreduce-on-grouped-dataset">GroupReduce on Grouped DataSet</h3> + +<p>A GroupReduce transformation that is applied on a grouped DataSet calls a user-defined +group-reduce function for each group. The difference +between this and <em>Reduce</em> is that the user defined function gets the whole group at once. +The function is invoked with an Iterable over all elements of a group and can return an arbitrary +number of result elements.</p> + +<h4 id="groupreduce-on-dataset-grouped-by-field-position-keys-tuple-datasets-only">GroupReduce on DataSet Grouped by Field Position Keys (Tuple DataSets only)</h4> + +<p>The following code shows how duplicate strings can be removed from a DataSet grouped by Integer.</p> + +<div class="codetabs"> + <div data-lang="java"> + + <div class="highlight"><pre><code class="language-java"><span class="kd">public</span> <span class="kd">class</span> <span class="nc">DistinctReduce</span> + <span class="kd">implements</span> <span class="n">GroupReduceFunction</span><span class="o"><</span><span class="n">Tuple2</span><span class="o"><</span><span class="n">Integer</span><span class="o">,</span> <span class="n">String</span><span class="o">>,</span> <span class="n">Tuple2</span><span class="o"><</span><span class="n">Integer</span><span class="o">,</span> <span class="n">String</span><span class="o">>></span> <span class="o">{</span> + + <span class="nd">@Override</span> + <span class="kd">public</span> <span class="kt">void</span> <span class="nf">reduce</span><span class="o">(</span><span class="n">Iterable</span><span class="o"><</span><span class="n">Tuple2</span><span class="o"><</span><span class="n">Integer</span><span class="o">,</span> <span class="n">String</span><span class="o">>></span> <span class="n">in</span><span class="o">,</span> <span class="n">Collector</span><span class="o"><</span><span class="n">Tuple2</span><span class="o"><</span><span class="n">Integer</span><span class="o">,</span> <span class="n">String</span><span class="o">>></span> <span class="n">out</span><span class="o">)</span> <span class="o">{</span> + + <span class="n">Set</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">uniqStrings</span> <span class="o">=</span> <span class="k">new</span> <span class="n">HashSet</span><span class="o"><</span><span class="n">String</span><span class="o">>();</span> + <span class="n">Integer</span> <span class="n">key</span> <span class="o">=</span> <span class="kc">null</span><span class="o">;</span> + + <span class="c1">// add all strings of the group to the set</span> + <span class="k">for</span> <span class="o">(</span><span class="n">Tuple2</span><span class="o"><</span><span class="n">Integer</span><span class="o">,</span> <span class="n">String</span><span class="o">></span> <span class="n">t</span> <span class="o">:</span> <span class="n">in</span><span class="o">)</span> <span class="o">{</span> + <span class="n">key</span> <span class="o">=</span> <span class="n">t</span><span class="o">.</span><span class="na">f0</span><span class="o">;</span> + <span class="n">uniqStrings</span><span class="o">.</span><span class="na">add</span><span class="o">(</span><span class="n">t</span><span class="o">.</span><span class="na">f1</span><span class="o">);</span> + <span class="o">}</span> + + <span class="c1">// emit all unique strings.</span> + <span class="k">for</span> <span class="o">(</span><span class="n">String</span> <span class="n">s</span> <span class="o">:</span> <span class="n">uniqStrings</span><span class="o">)</span> <span class="o">{</span> + <span class="n">out</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="k">new</span> <span class="n">Tuple2</span><span class="o"><</span><span class="n">Integer</span><span class="o">,</span> <span class="n">String</span><span class="o">>(</span><span class="n">key</span><span class="o">,</span> <span class="n">s</span><span class="o">));</span> + <span class="o">}</span> + <span class="o">}</span> +<span class="o">}</span> + +<span class="c1">// [...]</span> +<span class="n">DataSet</span><span class="o"><</span><span class="n">Tuple2</span><span class="o"><</span><span class="n">Integer</span><span class="o">,</span> <span class="n">String</span><span class="o">>></span> <span class="n">input</span> <span class="o">=</span> <span class="c1">// [...]</span> +<span class="n">DataSet</span><span class="o"><</span><span class="n">Tuple2</span><span class="o"><</span><span class="n">Integer</span><span class="o">,</span> <span class="n">String</span><span class="o">>></span> <span class="n">output</span> <span class="o">=</span> <span class="n">input</span> + <span class="o">.</span><span class="na">groupBy</span><span class="o">(</span><span class="mi">0</span><span class="o">)</span> <span class="c1">// group DataSet by the first tuple field</span> + <span class="o">.</span><span class="na">reduceGroup</span><span class="o">(</span><span class="k">new</span> <span class="nf">DistinctReduce</span><span class="o">());</span> <span class="c1">// apply GroupReduceFunction</span></code></pre></div> + + </div> + <div data-lang="scala"> + + <div class="highlight"><pre><code class="language-scala"><span class="k">val</span> <span class="n">input</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[(</span><span class="kt">Int</span>, <span class="kt">String</span><span class="o">)]</span> <span class="k">=</span> <span class="c1">// [...]</span> +<span class="k">val</span> <span class="n">output</span> <span class="k">=</span> <span class="n">input</span><span class="o">.</span><span class="n">groupBy</span><span class="o">(</span><span class="mi">0</span><span class="o">).</span><span class="n">reduceGroup</span> <span class="o">{</span> + <span class="o">(</span><span class="n">in</span><span class="o">,</span> <span class="n">out</span><span class="k">:</span> <span class="kt">Collector</span><span class="o">[(</span><span class="kt">Int</span>, <span class="kt">String</span><span class="o">)])</span> <span class="k">=></span> + <span class="n">in</span><span class="o">.</span><span class="n">toSet</span> <span class="n">foreach</span> <span class="o">(</span><span class="n">out</span><span class="o">.</span><span class="n">collect</span><span class="o">)</span> + <span class="o">}</span></code></pre></div> + + <h4 id="groupreduce-on-dataset-grouped-by-case-class-fields">GroupReduce on DataSet Grouped by Case Class Fields</h4> + + <p>Works analogous to grouping by Case Class fields in <em>Reduce</em> transformations.</p> + + </div> + <div data-lang="python"> + + <div class="highlight"><pre><code class="language-python"> <span class="k">class</span> <span class="nc">DistinctReduce</span><span class="p">(</span><span class="n">GroupReduceFunction</span><span class="p">):</span> + <span class="k">def</span> <span class="nf">reduce</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">iterator</span><span class="p">,</span> <span class="n">collector</span><span class="p">):</span> + <span class="n">dic</span> <span class="o">=</span> <span class="nb">dict</span><span class="p">()</span> + <span class="k">for</span> <span class="n">value</span> <span class="ow">in</span> <span class="n">iterator</span><span class="p">:</span> + <span class="n">dic</span><span class="p">[</span><span class="n">value</span><span class="p">[</span><span class="mi">1</span><span class="p">]]</span> <span class="o">=</span> <span class="mi">1</span> + <span class="k">for</span> <span class="n">key</span> <span class="ow">in</span> <span class="n">dic</span><span class="o">.</span><span class="n">keys</span><span class="p">():</span> + <span class="n">collector</span><span class="o">.</span><span class="n">collect</span><span class="p">(</span><span class="n">key</span><span class="p">)</span> + + <span class="n">output</span> <span class="o">=</span> <span class="n">data</span><span class="o">.</span><span class="n">group_by</span><span class="p">(</span><span class="mi">0</span><span class="p">)</span><span class="o">.</span><span class="n">reduce_group</span><span class="p">(</span><span class="n">DistinctReduce</span><span class="p">(),</span> <span class="n">STRING</span><span class="p">)</span></code></pre></div> + + </div> +</div> + +<h4 id="groupreduce-on-dataset-grouped-by-keyselector-function">GroupReduce on DataSet Grouped by KeySelector Function</h4> + +<p>Works analogous to key-selector functions in <em>Reduce</em> transformations.</p> + +<h4 id="groupreduce-on-sorted-groups">GroupReduce on sorted groups</h4> + +<p>A group-reduce function accesses the elements of a group using an Iterable. Optionally, the Iterable can hand out the elements of a group in a specified order. In many cases this can help to reduce the complexity of a user-defined +group-reduce function and improve its efficiency.</p> + +<p>The following code shows another example how to remove duplicate Strings in a DataSet grouped by an Integer and sorted by String.</p> + +<div class="codetabs"> + <div data-lang="java"> + + <div class="highlight"><pre><code class="language-java"><span class="c1">// GroupReduceFunction that removes consecutive identical elements</span> +<span class="kd">public</span> <span class="kd">class</span> <span class="nc">DistinctReduce</span> + <span class="kd">implements</span> <span class="n">GroupReduceFunction</span><span class="o"><</span><span class="n">Tuple2</span><span class="o"><</span><span class="n">Integer</span><span class="o">,</span> <span class="n">String</span><span class="o">>,</span> <span class="n">Tuple2</span><span class="o"><</span><span class="n">Integer</span><span class="o">,</span> <span class="n">String</span><span class="o">>></span> <span class="o">{</span> + + <span class="nd">@Override</span> + <span class="kd">public</span> <span class="kt">void</span> <span class="nf">reduce</span><span class="o">(</span><span class="n">Iterable</span><span class="o"><</span><span class="n">Tuple2</span><span class="o"><</span><span class="n">Integer</span><span class="o">,</span> <span class="n">String</span><span class="o">>></span> <span class="n">in</span><span class="o">,</span> <span class="n">Collector</span><span class="o"><</span><span class="n">Tuple2</span><span class="o"><</span><span class="n">Integer</span><span class="o">,</span> <span class="n">String</span><span class="o">>></span> <span class="n">out</span><span class="o">)</span> <span class="o">{</span> + <span class="n">Integer</span> <span class="n">key</span> <span class="o">=</span> <span class="kc">null</span><span class="o">;</span> + <span class="n">String</span> <span class="n">comp</span> <span class="o">=</span> <span class="kc">null</span><span class="o">;</span> + + <span class="k">for</span> <span class="o">(</span><span class="n">Tuple2</span><span class="o"><</span><span class="n">Integer</span><span class="o">,</span> <span class="n">String</span><span class="o">></span> <span class="n">t</span> <span class="o">:</span> <span class="n">in</span><span class="o">)</span> <span class="o">{</span> + <span class="n">key</span> <span class="o">=</span> <span class="n">t</span><span class="o">.</span><span class="na">f0</span><span class="o">;</span> + <span class="n">String</span> <span class="n">next</span> <span class="o">=</span> <span class="n">t</span><span class="o">.</span><span class="na">f1</span><span class="o">;</span> + + <span class="c1">// check if strings are different</span> + <span class="k">if</span> <span class="o">(</span><span class="n">com</span> <span class="o">==</span> <span class="kc">null</span> <span class="o">||</span> <span class="o">!</span><span class="n">next</span><span class="o">.</span><span class="na">equals</span><span class="o">(</span><span class="n">comp</span><span class="o">))</span> <span class="o">{</span> + <span class="n">out</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="k">new</span> <span class="n">Tuple2</span><span class="o"><</span><span class="n">Integer</span><span class="o">,</span> <span class="n">String</span><span class="o">>(</span><span class="n">key</span><span class="o">,</span> <span class="n">next</span><span class="o">));</span> + <span class="n">comp</span> <span class="o">=</span> <span class="n">next</span><span class="o">;</span> + <span class="o">}</span> + <span class="o">}</span> + <span class="o">}</span> +<span class="o">}</span> + +<span class="c1">// [...]</span> +<span class="n">DataSet</span><span class="o"><</span><span class="n">Tuple2</span><span class="o"><</span><span class="n">Integer</span><span class="o">,</span> <span class="n">String</span><span class="o">>></span> <span class="n">input</span> <span class="o">=</span> <span class="c1">// [...]</span> +<span class="n">DataSet</span><span class="o"><</span><span class="n">Double</span><span class="o">></span> <span class="n">output</span> <span class="o">=</span> <span class="n">input</span> + <span class="o">.</span><span class="na">groupBy</span><span class="o">(</span><span class="mi">0</span><span class="o">)</span> <span class="c1">// group DataSet by first field</span> + <span class="o">.</span><span class="na">sortGroup</span><span class="o">(</span><span class="mi">1</span><span class="o">,</span> <span class="n">Order</span><span class="o">.</span><span class="na">ASCENDING</span><span class="o">)</span> <span class="c1">// sort groups on second tuple field</span> + <span class="o">.</span><span class="na">reduceGroup</span><span class="o">(</span><span class="k">new</span> <span class="nf">DistinctReduce</span><span class="o">());</span></code></pre></div> + + </div> + <div data-lang="scala"> + + <div class="highlight"><pre><code class="language-scala"><span class="k">val</span> <span class="n">input</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[(</span><span class="kt">Int</span>, <span class="kt">String</span><span class="o">)]</span> <span class="k">=</span> <span class="c1">// [...]</span> +<span class="k">val</span> <span class="n">output</span> <span class="k">=</span> <span class="n">input</span><span class="o">.</span><span class="n">groupBy</span><span class="o">(</span><span class="mi">0</span><span class="o">).</span><span class="n">sortGroup</span><span class="o">(</span><span class="mi">1</span><span class="o">,</span> <span class="nc">Order</span><span class="o">.</span><span class="nc">ASCENDING</span><span class="o">).</span><span class="n">reduceGroup</span> <span class="o">{</span> + <span class="o">(</span><span class="n">in</span><span class="o">,</span> <span class="n">out</span><span class="k">:</span> <span class="kt">Collector</span><span class="o">[(</span><span class="kt">Int</span>, <span class="kt">String</span><span class="o">)])</span> <span class="k">=></span> + <span class="k">var</span> <span class="n">prev</span><span class="k">:</span> <span class="o">(</span><span class="kt">Int</span><span class="o">,</span> <span class="kt">String</span><span class="o">)</span> <span class="k">=</span> <span class="kc">null</span> + <span class="k">for</span> <span class="o">(</span><span class="n">t</span> <span class="k"><-</span> <span class="n">in</span><span class="o">)</span> <span class="o">{</span> + <span class="k">if</span> <span class="o">(</span><span class="n">prev</span> <span class="o">==</span> <span class="kc">null</span> <span class="o">||</span> <span class="n">prev</span> <span class="o">!=</span> <span class="n">t</span><span class="o">)</span> + <span class="n">out</span><span class="o">.</span><span class="n">collect</span><span class="o">(</span><span class="n">t</span><span class="o">)</span> + <span class="o">}</span> + <span class="o">}</span></code></pre></div> + + </div> + <div data-lang="python"> + + <div class="highlight"><pre><code class="language-python"> <span class="k">class</span> <span class="nc">DistinctReduce</span><span class="p">(</span><span class="n">GroupReduceFunction</span><span class="p">):</span> + <span class="k">def</span> <span class="nf">reduce</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">iterator</span><span class="p">,</span> <span class="n">collector</span><span class="p">):</span> + <span class="n">dic</span> <span class="o">=</span> <span class="nb">dict</span><span class="p">()</span> + <span class="k">for</span> <span class="n">value</span> <span class="ow">in</span> <span class="n">iterator</span><span class="p">:</span> + <span class="n">dic</span><span class="p">[</span><span class="n">value</span><span class="p">[</span><span class="mi">1</span><span class="p">]]</span> <span class="o">=</span> <span class="mi">1</span> + <span class="k">for</span> <span class="n">key</span> <span class="ow">in</span> <span class="n">dic</span><span class="o">.</span><span class="n">keys</span><span class="p">():</span> + <span class="n">collector</span><span class="o">.</span><span class="n">collect</span><span class="p">(</span><span class="n">key</span><span class="p">)</span> + + <span class="n">output</span> <span class="o">=</span> <span class="n">data</span><span class="o">.</span><span class="n">group_by</span><span class="p">(</span><span class="mi">0</span><span class="p">)</span><span class="o">.</span><span class="n">sort_group</span><span class="p">(</span><span class="mi">1</span><span class="p">,</span> <span class="n">Order</span><span class="o">.</span><span class="n">ASCENDING</span><span class="p">)</span><span class="o">.</span><span class="n">reduce_group</span><span class="p">(</span><span class="n">DistinctReduce</span><span class="p">(),</span> <span class="n">STRING</span><span class="p">)</span></code></pre></div> + + </div> +</div> + +<p><strong>Note:</strong> A GroupSort often comes for free if the grouping is established using a sort-based execution strategy of an operator before the reduce operation.</p> + +<h4 id="combinable-groupreducefunctions">Combinable GroupReduceFunctions</h4> + +<p>In contrast to a reduce function, a group-reduce function is not +necessarily combinable. In order to make a group-reduce function +combinable, you need to use the <code>RichGroupReduceFunction</code> variant, +implement (override) the <code>combine()</code> method, and annotate the +<code>RichGroupReduceFunction</code> with the <code>@Combinable</code> annotation as shown here:</p> + +<div class="codetabs"> + <div data-lang="java"> + + <div class="highlight"><pre><code class="language-java"><span class="c1">// Combinable GroupReduceFunction that computes two sums.</span> +<span class="c1">// Note that we use the RichGroupReduceFunction because it defines the combine method</span> +<span class="nd">@Combinable</span> +<span class="kd">public</span> <span class="kd">class</span> <span class="nc">MyCombinableGroupReducer</span> + <span class="kd">extends</span> <span class="n">RichGroupReduceFunction</span><span class="o"><</span><span class="n">Tuple3</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">,</span> <span class="n">Double</span><span class="o">>,</span> + <span class="n">Tuple3</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">,</span> <span class="n">Double</span><span class="o">>></span> <span class="o">{</span> + <span class="nd">@Override</span> + <span class="kd">public</span> <span class="kt">void</span> <span class="nf">reduce</span><span class="o">(</span><span class="n">Iterable</span><span class="o"><</span><span class="n">Tuple3</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">,</span> <span class="n">Double</span><span class="o">>></span> <span class="n">in</span><span class="o">,</span> + <span class="n">Collector</span><span class="o"><</span><span class="n">Tuple3</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">,</span> <span class="n">Double</span><span class="o">>></span> <span class="n">out</span><span class="o">)</span> <span class="o">{</span> + + <span class="n">String</span> <span class="n">key</span> <span class="o">=</span> <span class="kc">null</span><span class="o">;</span> + <span class="kt">int</span> <span class="n">intSum</span> <span class="o">=</span> <span class="mi">0</span><span class="o">;</span> + <span class="kt">double</span> <span class="n">doubleSum</span> <span class="o">=</span> <span class="mf">0.0</span><span class="o">;</span> + + <span class="k">for</span> <span class="o">(</span><span class="n">Tuple3</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">,</span> <span class="n">Double</span><span class="o">></span> <span class="n">curr</span> <span class="o">:</span> <span class="n">in</span><span class="o">)</span> <span class="o">{</span> + <span class="n">key</span> <span class="o">=</span> <span class="n">curr</span><span class="o">.</span><span class="na">f0</span><span class="o">;</span> + <span class="n">intSum</span> <span class="o">+=</span> <span class="n">curr</span><span class="o">.</span><span class="na">f1</span><span class="o">;</span> + <span class="n">doubleSum</span> <span class="o">+=</span> <span class="n">curr</span><span class="o">.</span><span class="na">f2</span><span class="o">;</span> + <span class="o">}</span> + <span class="c1">// emit a tuple with both sums</span> + <span class="n">out</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="k">new</span> <span class="n">Tuple3</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">,</span> <span class="n">Double</span><span class="o">>(</span><span class="n">key</span><span class="o">,</span> <span class="n">intSum</span><span class="o">,</span> <span class="n">doubleSum</span><span class="o">));</span> + <span class="o">}</span> + + <span class="nd">@Override</span> + <span class="kd">public</span> <span class="kt">void</span> <span class="nf">combine</span><span class="o">(</span><span class="n">Iterable</span><span class="o"><</span><span class="n">Tuple3</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">,</span> <span class="n">Double</span><span class="o">>></span> <span class="n">in</span><span class="o">,</span> + <span class="n">Collector</span><span class="o"><</span><span class="n">Tuple3</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">,</span> <span class="n">Double</span><span class="o">>></span> <span class="n">out</span><span class="o">)</span> <span class="o">{</span> + <span class="c1">// in some cases combine() calls can simply be forwarded to reduce().</span> + <span class="k">this</span><span class="o">.</span><span class="na">reduce</span><span class="o">(</span><span class="n">in</span><span class="o">,</span> <span class="n">out</span><span class="o">);</span> + <span class="o">}</span> +<span class="o">}</span></code></pre></div> + + </div> + <div data-lang="scala"> + + <div class="highlight"><pre><code class="language-scala"><span class="c1">// Combinable GroupReduceFunction that computes two sums.</span> +<span class="c1">// Note that we use the RichGroupReduceFunction because it defines the combine method</span> +<span class="nd">@Combinable</span> +<span class="k">class</span> <span class="nc">MyCombinableGroupReducer</span> + <span class="k">extends</span> <span class="nc">RichGroupReduceFunction</span><span class="o">[(</span><span class="kt">String</span>, <span class="kt">Int</span>, <span class="kt">Double</span><span class="o">)</span>, <span class="o">(</span><span class="kt">String</span>, <span class="kt">Int</span>, <span class="kt">Double</span><span class="o">)]</span> <span class="o">{}</span> + + <span class="k">def</span> <span class="n">reduce</span><span class="o">(</span> + <span class="n">in</span><span class="k">:</span> <span class="kt">java.lang.Iterable</span><span class="o">[(</span><span class="kt">String</span>, <span class="kt">Int</span>, <span class="kt">Double</span><span class="o">)],</span> + <span class="n">out</span><span class="k">:</span> <span class="kt">Collector</span><span class="o">[(</span><span class="kt">String</span>, <span class="kt">Int</span>, <span class="kt">Double</span><span class="o">)])</span><span class="k">:</span> <span class="kt">Unit</span> <span class="o">=</span> <span class="o">{</span> + + <span class="k">val</span> <span class="n">key</span><span class="k">:</span> <span class="kt">String</span> <span class="o">=</span> <span class="kc">null</span> + <span class="k">val</span> <span class="n">intSum</span> <span class="k">=</span> <span class="mi">0</span> + <span class="k">val</span> <span class="n">doubleSum</span> <span class="k">=</span> <span class="mf">0.0</span> + + <span class="k">for</span> <span class="o">(</span><span class="n">curr</span> <span class="k"><-</span> <span class="n">in</span><span class="o">)</span> <span class="o">{</span> + <span class="n">key</span> <span class="k">=</span> <span class="n">curr</span><span class="o">.</span><span class="n">_1</span> + <span class="n">intSum</span> <span class="o">+=</span> <span class="n">curr</span><span class="o">.</span><span class="n">_2</span> + <span class="n">doubleSum</span> <span class="o">+=</span> <span class="n">curr</span><span class="o">.</span><span class="n">_3</span> + <span class="o">}</span> + <span class="c1">// emit a tuple with both sums</span> + <span class="n">out</span><span class="o">.</span><span class="n">collect</span><span class="o">(</span><span class="n">key</span><span class="o">,</span> <span class="n">intSum</span><span class="o">,</span> <span class="n">doubleSum</span><span class="o">);</span> + <span class="o">}</span> + + <span class="k">def</span> <span class="n">combine</span><span class="o">(</span> + <span class="n">in</span><span class="k">:</span> <span class="kt">java.lang.Iterable</span><span class="o">[(</span><span class="kt">String</span>, <span class="kt">Int</span>, <span class="kt">Double</span><span class="o">)],</span> + <span class="n">out</span><span class="k">:</span> <span class="kt">Collector</span><span class="o">[(</span><span class="kt">String</span>, <span class="kt">Int</span>, <span class="kt">Double</span><span class="o">)])</span><span class="k">:</span> <span class="kt">Unit</span> <span class="o">=</span> <span class="o">{</span> + <span class="c1">// in some cases combine() calls can simply be forwarded to reduce().</span> + <span class="k">this</span><span class="o">.</span><span class="n">reduce</span><span class="o">(</span><span class="n">in</span><span class="o">,</span> <span class="n">out</span><span class="o">)</span> + <span class="o">}</span> +<span class="o">}</span></code></pre></div> + + </div> + <div data-lang="python"> + + <div class="highlight"><pre><code class="language-python"> <span class="k">class</span> <span class="nc">GroupReduce</span><span class="p">(</span><span class="n">GroupReduceFunction</span><span class="p">):</span> + <span class="k">def</span> <span class="nf">reduce</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">iterator</span><span class="p">,</span> <span class="n">collector</span><span class="p">):</span> + <span class="n">key</span><span class="p">,</span> <span class="n">int_sum</span><span class="p">,</span> <span class="n">float_sum</span> <span class="o">=</span> <span class="n">iterator</span><span class="o">.</span><span class="n">next</span><span class="p">()</span> + <span class="k">for</span> <span class="n">value</span> <span class="ow">in</span> <span class="n">iterator</span><span class="p">:</span> + <span class="n">int_sum</span> <span class="o">+=</span> <span class="n">value</span><span class="p">[</span><span class="mi">1</span><span class="p">]</span> + <span class="n">float_sum</span> <span class="o">+=</span> <span class="n">value</span><span class="p">[</span><span class="mi">2</span><span class="p">]</span> + <span class="n">collector</span><span class="o">.</span><span class="n">collect</span><span class="p">((</span><span class="n">key</span><span class="p">,</span> <span class="n">int_sum</span><span class="p">,</span> <span class="n">float_sum</span><span class="p">))</span> + <span class="c"># in some cases combine() calls can simply be forwarded to reduce().</span> + <span class="k">def</span> <span class="nf">combine</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">iterator</span><span class="p">,</span> <span class="n">collector</span><span class="p">):</span> + <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">reduce</span><span class="p">(</span><span class="n">iterator</span><span class="p">,</span> <span class="n">collector</span><span class="p">)</span> + +<span class="n">data</span><span class="o">.</span><span class="n">reduce_group</span><span class="p">(</span><span class="n">GroupReduce</span><span class="p">(),</span> <span class="p">(</span><span class="n">STRING</span><span class="p">,</span> <span class="n">INT</span><span class="p">,</span> <span class="n">FLOAT</span><span class="p">),</span> <span class="n">combinable</span><span class="o">=</span><span class="bp">True</span><span class="p">)</span></code></pre></div> + + </div> +</div> + +<h3 id="groupcombine-on-a-grouped-dataset">GroupCombine on a Grouped DataSet</h3> + +<p>The GroupCombine transformation is the generalized form of the combine step in +the Combinable GroupReduceFunction. It is generalized in the sense that it +allows combining of input type <code>I</code> to an arbitrary output type <code>O</code>. In contrast, +the combine step in the GroupReduce only allows combining from input type <code>I</code> to +output type <code>I</code>. This is because the reduce step in the GroupReduceFunction +expects input type <code>I</code>.</p> + +<p>In some applications, it is desirable to combine a DataSet into an intermediate +format before performing additional transformations (e.g. to reduce data +size). This can be achieved with a ComineGroup transformation with very little +costs.</p> + +<p><strong>Note:</strong> The GroupCombine on a Grouped DataSet is performed in memory with a + greedy strategy which may not process all data at once but in multiple + steps. It is also performed on the individual partitions without a data + exchange like in a GroupReduce transformation. This may lead to partial + results.</p> + +<p>The following example demonstrates the use of a CombineGroup transformation for +an alternative WordCount implementation. In the implementation,</p> + +<div class="codetabs"> + <div data-lang="java"> + + <div class="highlight"><pre><code class="language-java"><span class="n">DataSet</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">input</span> <span class="o">=</span> <span class="o">[..]</span> <span class="c1">// The words received as input</span> +<span class="n">DataSet</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">groupedInput</span> <span class="o">=</span> <span class="n">input</span><span class="o">.</span><span class="na">groupBy</span><span class="o">(</span><span class="mi">0</span><span class="o">);</span> <span class="c1">// group identical words</span> + +<span class="n">DataSet</span><span class="o"><</span><span class="n">Tuple2</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">>></span> <span class="n">combinedWords</span> <span class="o">=</span> <span class="n">groupedInput</span><span class="o">.</span><span class="na">combineGroup</span><span class="o">(</span><span class="k">new</span> <span class="n">GroupCombineFunction</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Tuple2</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">>()</span> <span class="o">{</span> + + <span class="kd">public</span> <span class="kt">void</span> <span class="nf">combine</span><span class="o">(</span><span class="n">Iterable</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">words</span><span class="o">,</span> <span class="n">Collector</span><span class="o"><</span><span class="n">Tuple2</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">>>)</span> <span class="o">{</span> <span class="c1">// combine</span> + <span class="kt">int</span> <span class="n">count</span> <span class="o">=</span> <span class="mi">0</span><span class="o">;</span> + <span class="k">for</span> <span class="o">(</span><span class="n">String</span> <span class="n">word</span> <span class="o">:</span> <span class="n">words</span><span class="o">)</span> <span class="o">{</span> + <span class="n">count</span><span class="o">++;</span> + <span class="o">}</span> + <span class="n">out</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="k">new</span> <span class="nf">Tuple2</span><span class="o">(</span><span class="n">word</span><span class="o">,</span> <span class="n">count</span><span class="o">));</span> + <span class="o">}</span> +<span class="o">});</span> + +<span class="n">DataSet</span><span class="o"><</span><span class="n">Tuple2</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">>></span> <span class="n">groupedCombinedWords</span> <span class="o">=</span> <span class="n">combinedWords</span><span class="o">.</span><span class="na">groupBy</span><span class="o">(</span><span class="mi">0</span><span class="o">);</span> <span class="c1">// group by words again</span> + +<span class="n">DataSet</span><span class="o"><</span><span class="n">Tuple2</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">>></span> <span class="n">output</span> <span class="o">=</span> <span class="n">combinedWords</span><span class="o">.</span><span class="na">reduceGroup</span><span class="o">(</span><span class="k">new</span> <span class="nf">GroupReduceFunction</span><span class="o">()</span> <span class="o">{</span> <span class="c1">// group reduce with full data exchange</span> + + <span class="kd">public</span> <span class="kt">void</span> <span class="nf">reduce</span><span class="o">(</span><span class="n">Iterable</span><span class="o"><</span><span class="n">Tuple2</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">>>,</span> <span class="n">Collector</span><span class="o"><</span><span class="n">Tuple2</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">>>)</span> <span class="o">{</span> + <span class="kt">int</span> <span class="n">count</span> <span class="o">=</span> <span class="mi">0</span><span class="o">;</span> + <span class="k">for</span> <span class="o">(</span><span class="n">Tuple2</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">></span> <span class="n">word</span> <span class="o">:</span> <span class="n">words</span><span class="o">)</span> <span class="o">{</span> + <span class="n">count</span><span class="o">++;</span> + <span class="o">}</span> + <span class="n">out</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="k">new</span> <span class="nf">Tuple2</span><span class="o">(</span><span class="n">word</span><span class="o">,</span> <span class="n">count</span><span class="o">));</span> + <span class="o">}</span> +<span class="o">});</span></code></pre></div> + + </div> + <div data-lang="scala"> + + <div class="highlight"><pre><code class="language-scala"><span class="k">val</span> <span class="n">input</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[</span><span class="kt">String</span><span class="o">]</span> <span class="k">=</span> <span class="o">[</span><span class="kt">..</span><span class="o">]</span> <span class="c1">// The words received as input</span> +<span class="k">val</span> <span class="n">groupedInput</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[</span><span class="kt">String</span><span class="o">]</span> <span class="k">=</span> <span class="n">input</span><span class="o">.</span><span class="n">groupBy</span><span class="o">(</span><span class="mi">0</span><span class="o">)</span> + +<span class="k">val</span> <span class="n">combinedWords</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[(</span><span class="kt">String</span>, <span class="kt">Int</span><span class="o">)]</span> <span class="k">=</span> <span class="n">groupedInput</span><span class="o">.</span><span class="n">combineGroup</span> <span class="o">{</span> + <span class="o">(</span><span class="n">words</span><span class="o">,</span> <span class="n">out</span><span class="k">:</span> <span class="kt">Collector</span><span class="o">[(</span><span class="kt">String</span>, <span class="kt">Int</span><span class="o">)])</span> <span class="k">=></span> + <span class="k">var</span> <span class="n">count</span> <span class="k">=</span> <span class="mi">0</span> + <span class="k">for</span> <span class="o">(</span><span class="n">word</span> <span class="k"><-</span> <span class="n">words</span><span class="o">)</span> <span class="o">{</span> + <span class="n">count</span><span class="o">++</span> + <span class="o">}</span> + <span class="n">out</span><span class="o">.</span><span class="n">collect</span><span class="o">(</span><span class="n">word</span><span class="o">,</span> <span class="n">count</span><span class="o">)</span> +<span class="o">}</span> + +<span class="k">val</span> <span class="n">groupedCombinedWords</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[(</span><span class="kt">String</span>, <span class="kt">Int</span><span class="o">)]</span> <span class="k">=</span> <span class="n">combinedWords</span><span class="o">.</span><span class="n">groupBy</span><span class="o">(</span><span class="mi">0</span><span class="o">)</span> + +<span class="k">val</span> <span class="n">output</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[(</span><span class="kt">String</span>, <span class="kt">Int</span><span class="o">)]</span> <span class="k">=</span> <span class="n">groupedInput</span><span class="o">.</span><span class="n">reduceGroup</span> <span class="o">{</span> + <span class="o">(</span><span class="n">words</span><span class="o">,</span> <span class="n">out</span><span class="k">:</span> <span class="kt">Collector</span><span class="o">[(</span><span class="kt">String</span>, <span class="kt">Int</span><span class="o">)])</span> <span class="k">=></span> + <span class="k">var</span> <span class="n">count</span> <span class="k">=</span> <span class="mi">0</span> + <span class="k">for</span> <span class="o">((</span><span class="n">word</span><span class="o">,</span> <span class="nc">Int</span><span class="o">)</span> <span class="k"><-</span> <span class="n">words</span><span class="o">)</span> <span class="o">{</span> + <span class="n">count</span><span class="o">++</span> + <span class="o">}</span> + <span class="n">out</span><span class="o">.</span><span class="n">collect</span><span class="o">(</span><span class="n">word</span><span class="o">,</span> <span class="n">count</span><span class="o">)</span> +<span class="o">}</span></code></pre></div> + + </div> +</div> + +<p>The above alternative WordCount implementation demonstrates how the GroupCombine +combines words before performing the GroupReduce transformation. The above +example is just a proof of concept. Note, how the combine step changes the type +of the DataSet which would normally required an additional Map transformation +before executing the GroupReduce.</p> + +<h3 id="aggregate-on-grouped-tuple-dataset">Aggregate on Grouped Tuple DataSet</h3> + +<p>There are some common aggregation operations that are frequently used. The Aggregate transformation provides the following build-in aggregation functions:</p> + +<ul> + <li>Sum,</li> + <li>Min, and</li> + <li>Max.</li> +</ul> + +<p>The Aggregate transformation can only be applied on a Tuple DataSet and supports only field positions keys for grouping.</p> + +<p>The following code shows how to apply an Aggregation transformation on a DataSet grouped by field position keys:</p> + +<div class="codetabs"> + <div data-lang="java"> + + <div class="highlight"><pre><code class="language-java"><span class="n">DataSet</span><span class="o"><</span><span class="n">Tuple3</span><span class="o"><</span><span class="n">Integer</span><span class="o">,</span> <span class="n">String</span><span class="o">,</span> <span class="n">Double</span><span class="o">>></span> <span class="n">input</span> <span class="o">=</span> <span class="c1">// [...]</span> +<span class="n">DataSet</span><span class="o"><</span><span class="n">Tuple3</span><span class="o"><</span><span class="n">Integer</span><span class="o">,</span> <span class="n">String</span><span class="o">,</span> <span class="n">Double</span><span class="o">>></span> <span class="n">output</span> <span class="o">=</span> <span class="n">input</span> + <span class="o">.</span><span class="na">groupBy</span><span class="o">(</span><span class="mi">1</span><span class="o">)</span> <span class="c1">// group DataSet on second field</span> + <span class="o">.</span><span class="na">aggregate</span><span class="o">(</span><span class="n">SUM</span><span class="o">,</span> <span class="mi">0</span><span class="o">)</span> <span class="c1">// compute sum of the first field</span> + <span class="o">.</span><span class="na">and</span><span class="o">(</span><span class="n">MIN</span><span class="o">,</span> <span class="mi">2</span><span class="o">);</span> <span class="c1">// compute minimum of the third field</span></code></pre></div> + + </div> + <div data-lang="scala"> + + <div class="highlight"><pre><code class="language-scala"><span class="k">val</span> <span class="n">input</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[(</span><span class="kt">Int</span>, <span class="kt">String</span>, <span class="kt">Double<
<TRUNCATED>
