Regenerate website
Project: http://git-wip-us.apache.org/repos/asf/beam-site/repo Commit: http://git-wip-us.apache.org/repos/asf/beam-site/commit/a163bcf4 Tree: http://git-wip-us.apache.org/repos/asf/beam-site/tree/a163bcf4 Diff: http://git-wip-us.apache.org/repos/asf/beam-site/diff/a163bcf4 Branch: refs/heads/asf-site Commit: a163bcf42026b00146a2e21df034d03f1efb2acc Parents: 4348e5f Author: Davor Bonaci <da...@google.com> Authored: Thu Apr 27 16:20:30 2017 -0700 Committer: Davor Bonaci <da...@google.com> Committed: Thu Apr 27 16:20:30 2017 -0700 ---------------------------------------------------------------------- .../documentation/programming-guide/index.html | 279 +++++++++++++++---- content/images/trigger-accumulation.png | Bin 0 -> 11144 bytes 2 files changed, 221 insertions(+), 58 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam-site/blob/a163bcf4/content/documentation/programming-guide/index.html ---------------------------------------------------------------------- diff --git a/content/documentation/programming-guide/index.html b/content/documentation/programming-guide/index.html index 38f7bfc..439d48b 100644 --- a/content/documentation/programming-guide/index.html +++ b/content/documentation/programming-guide/index.html @@ -156,7 +156,7 @@ <p>The <strong>Beam Programming Guide</strong> is intended for Beam users who want to use the Beam SDKs to create data processing pipelines. It provides guidance for using the Beam SDK classes to build and test your pipeline. It is not intended as an exhaustive reference, but as a language-agnostic, high-level guide to programmatically building your Beam pipeline. As the programming guide is filled out, the text will include code samples in multiple languages to help illustrate how to implement Beam concepts in your pipelines.</p> <nav class="language-switcher"> - <strong>Adapt for:</strong> + <strong>Adapt for:</strong> <ul> <li data-type="language-java" class="active">Java SDK</li> <li data-type="language-py">Python SDK</li> @@ -215,7 +215,7 @@ <p><code class="highlighter-rouge">PCollection</code>: A <code class="highlighter-rouge">PCollection</code> represents a distributed data set that your Beam pipeline operates on. The data set can be <em>bounded</em>, meaning it comes from a fixed source like a file, or <em>unbounded</em>, meaning it comes from a continuously updating source via a subscription or other mechanism. Your pipeline typically creates an initial <code class="highlighter-rouge">PCollection</code> by reading data from an external data source, but you can also create a <code class="highlighter-rouge">PCollection</code> from in-memory data within your driver program. From there, <code class="highlighter-rouge">PCollection</code>s are the inputs and outputs for each step in your pipeline.</p> </li> <li> - <p><code class="highlighter-rouge">Transform</code>: A <code class="highlighter-rouge">Transform</code> represents a data processing operation, or a step, in your pipeline. Every <code class="highlighter-rouge">Transform</code> takes one or more <code class="highlighter-rouge">PCollection</code> objects as input, perfroms a processing function that you provide on the elements of that <code class="highlighter-rouge">PCollection</code>, and produces one or more output <code class="highlighter-rouge">PCollection</code> objects.</p> + <p><code class="highlighter-rouge">Transform</code>: A <code class="highlighter-rouge">Transform</code> represents a data processing operation, or a step, in your pipeline. Every <code class="highlighter-rouge">Transform</code> takes one or more <code class="highlighter-rouge">PCollection</code> objects as input, performs a processing function that you provide on the elements of that <code class="highlighter-rouge">PCollection</code>, and produces one or more output <code class="highlighter-rouge">PCollection</code> objects.</p> </li> <li> <p>I/O <code class="highlighter-rouge">Source</code> and <code class="highlighter-rouge">Sink</code>: Beam provides <code class="highlighter-rouge">Source</code> and <code class="highlighter-rouge">Sink</code> APIs to represent reading and writing data, respectively. <code class="highlighter-rouge">Source</code> encapsulates the code necessary to read data into your Beam pipeline from some external source, such as cloud file storage or a subscription to a streaming data source. <code class="highlighter-rouge">Sink</code> likewise encapsulates the code necessary to write the elements of a <code class="highlighter-rouge">PCollection</code> to an external data sink.</p> @@ -258,6 +258,7 @@ <div class="language-py highlighter-rouge"><pre class="highlight"><code><span class="c"># Will parse the arguments passed into the application and construct a PipelineOptions object.</span> <span class="c"># Note that --help will print registered options.</span> +<span class="kn">import</span> <span class="nn">apache_beam</span> <span class="kn">as</span> <span class="nn">beam</span> <span class="kn">from</span> <span class="nn">apache_beam.utils.pipeline_options</span> <span class="kn">import</span> <span class="n">PipelineOptions</span> <span class="n">p</span> <span class="o">=</span> <span class="n">beam</span><span class="o">.</span><span class="n">Pipeline</span><span class="p">(</span><span class="n">options</span><span class="o">=</span><span class="n">PipelineOptions</span><span class="p">())</span> @@ -285,7 +286,7 @@ <div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="kd">public</span> <span class="kd">static</span> <span class="kt">void</span> <span class="nf">main</span><span class="o">(</span><span class="n">String</span><span class="o">[]</span> <span class="n">args</span><span class="o">)</span> <span class="o">{</span> <span class="c1">// Create the pipeline.</span> - <span class="n">PipelineOptions</span> <span class="n">options</span> <span class="o">=</span> + <span class="n">PipelineOptions</span> <span class="n">options</span> <span class="o">=</span> <span class="n">PipelineOptionsFactory</span><span class="o">.</span><span class="na">fromArgs</span><span class="o">(</span><span class="n">args</span><span class="o">).</span><span class="na">create</span><span class="o">();</span> <span class="n">Pipeline</span> <span class="n">p</span> <span class="o">=</span> <span class="n">Pipeline</span><span class="o">.</span><span class="na">create</span><span class="o">(</span><span class="n">options</span><span class="o">);</span> @@ -321,7 +322,7 @@ <span class="s">"Or to take arms against a sea of troubles, "</span><span class="o">);</span> <span class="c1">// Create the pipeline.</span> - <span class="n">PipelineOptions</span> <span class="n">options</span> <span class="o">=</span> + <span class="n">PipelineOptions</span> <span class="n">options</span> <span class="o">=</span> <span class="n">PipelineOptionsFactory</span><span class="o">.</span><span class="na">fromArgs</span><span class="o">(</span><span class="n">args</span><span class="o">).</span><span class="na">create</span><span class="o">();</span> <span class="n">Pipeline</span> <span class="n">p</span> <span class="o">=</span> <span class="n">Pipeline</span><span class="o">.</span><span class="na">create</span><span class="o">(</span><span class="n">options</span><span class="o">);</span> @@ -333,15 +334,12 @@ <div class="language-py highlighter-rouge"><pre class="highlight"><code><span class="n">p</span> <span class="o">=</span> <span class="n">beam</span><span class="o">.</span><span class="n">Pipeline</span><span class="p">(</span><span class="n">options</span><span class="o">=</span><span class="n">pipeline_options</span><span class="p">)</span> -<span class="p">(</span><span class="n">p</span> - <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">Create</span><span class="p">([</span> - <span class="s">'To be, or not to be: that is the question: '</span><span class="p">,</span> - <span class="s">'Whether </span><span class="se">\'</span><span class="s">tis nobler in the mind to suffer '</span><span class="p">,</span> - <span class="s">'The slings and arrows of outrageous fortune, '</span><span class="p">,</span> - <span class="s">'Or to take arms against a sea of troubles, '</span><span class="p">])</span> - <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">io</span><span class="o">.</span><span class="n">WriteToText</span><span class="p">(</span><span class="n">my_options</span><span class="o">.</span><span class="n">output</span><span class="p">))</span> - -<span class="n">result</span> <span class="o">=</span> <span class="n">p</span><span class="o">.</span><span class="n">run</span><span class="p">()</span> +<span class="n">lines</span> <span class="o">=</span> <span class="p">(</span><span class="n">p</span> + <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">Create</span><span class="p">([</span> + <span class="s">'To be, or not to be: that is the question: '</span><span class="p">,</span> + <span class="s">'Whether </span><span class="se">\'</span><span class="s">tis nobler in the mind to suffer '</span><span class="p">,</span> + <span class="s">'The slings and arrows of outrageous fortune, '</span><span class="p">,</span> + <span class="s">'Or to take arms against a sea of troubles, '</span><span class="p">]))</span> </code></pre> </div> @@ -497,7 +495,7 @@ <span class="k">class</span> <span class="nc">ComputeWordLengthFn</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">DoFn</span><span class="p">):</span> <span class="k">def</span> <span class="nf">process</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">):</span> <span class="k">return</span> <span class="p">[</span><span class="nb">len</span><span class="p">(</span><span class="n">element</span><span class="p">)]</span> - + <span class="c"># Apply a ParDo to the PCollection "words" to compute lengths for each word.</span> <span class="n">word_lengths</span> <span class="o">=</span> <span class="n">words</span> <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">ParDo</span><span class="p">(</span><span class="n">ComputeWordLengthFn</span><span class="p">())</span> </code></pre> @@ -776,20 +774,6 @@ guest, [[], [order4]] </div> <div class="language-py highlighter-rouge"><pre class="highlight"><code><span class="n">pc</span> <span class="o">=</span> <span class="o">...</span> -<span class="k">class</span> <span class="nc">AverageFn</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">CombineFn</span><span class="p">):</span> - <span class="k">def</span> <span class="nf">create_accumulator</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> - <span class="k">return</span> <span class="p">(</span><span class="mf">0.0</span><span class="p">,</span> <span class="mi">0</span><span class="p">)</span> - - <span class="k">def</span> <span class="nf">add_input</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="p">(</span><span class="nb">sum</span><span class="p">,</span> <span class="n">count</span><span class="p">),</span> <span class="nb">input</span><span class="p">):</span> - <span class="k">return</span> <span class="nb">sum</span> <span class="o">+</span> <span class="nb">input</span><span class="p">,</span> <span class="n">count</span> <span class="o">+</span> <span class="mi">1</span> - - <span class="k">def</span> <span class="nf">merge_accumulators</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">accumulators</span><span class="p">):</span> - <span class="n">sums</span><span class="p">,</span> <span class="n">counts</span> <span class="o">=</span> <span class="nb">zip</span><span class="p">(</span><span class="o">*</span><span class="n">accumulators</span><span class="p">)</span> - <span class="k">return</span> <span class="nb">sum</span><span class="p">(</span><span class="n">sums</span><span class="p">),</span> <span class="nb">sum</span><span class="p">(</span><span class="n">counts</span><span class="p">)</span> - - <span class="k">def</span> <span class="nf">extract_output</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="p">(</span><span class="nb">sum</span><span class="p">,</span> <span class="n">count</span><span class="p">)):</span> - <span class="k">return</span> <span class="nb">sum</span> <span class="o">/</span> <span class="n">count</span> <span class="k">if</span> <span class="n">count</span> <span class="k">else</span> <span class="nb">float</span><span class="p">(</span><span class="s">'NaN'</span><span class="p">)</span> -<span class="n">average</span> <span class="o">=</span> <span class="n">pc</span> <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">CombineGlobally</span><span class="p">(</span><span class="n">AverageFn</span><span class="p">())</span> </code></pre> </div> @@ -810,20 +794,6 @@ guest, [[], [order4]] <div class="language-py highlighter-rouge"><pre class="highlight"><code><span class="c"># sum combines the elements in the input PCollection.</span> <span class="c"># The resulting PCollection, called result, contains one value: the sum of all the elements in the input PCollection.</span> <span class="n">pc</span> <span class="o">=</span> <span class="o">...</span> -<span class="k">class</span> <span class="nc">AverageFn</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">CombineFn</span><span class="p">):</span> - <span class="k">def</span> <span class="nf">create_accumulator</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> - <span class="k">return</span> <span class="p">(</span><span class="mf">0.0</span><span class="p">,</span> <span class="mi">0</span><span class="p">)</span> - - <span class="k">def</span> <span class="nf">add_input</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="p">(</span><span class="nb">sum</span><span class="p">,</span> <span class="n">count</span><span class="p">),</span> <span class="nb">input</span><span class="p">):</span> - <span class="k">return</span> <span class="nb">sum</span> <span class="o">+</span> <span class="nb">input</span><span class="p">,</span> <span class="n">count</span> <span class="o">+</span> <span class="mi">1</span> - - <span class="k">def</span> <span class="nf">merge_accumulators</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">accumulators</span><span class="p">):</span> - <span class="n">sums</span><span class="p">,</span> <span class="n">counts</span> <span class="o">=</span> <span class="nb">zip</span><span class="p">(</span><span class="o">*</span><span class="n">accumulators</span><span class="p">)</span> - <span class="k">return</span> <span class="nb">sum</span><span class="p">(</span><span class="n">sums</span><span class="p">),</span> <span class="nb">sum</span><span class="p">(</span><span class="n">counts</span><span class="p">)</span> - - <span class="k">def</span> <span class="nf">extract_output</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="p">(</span><span class="nb">sum</span><span class="p">,</span> <span class="n">count</span><span class="p">)):</span> - <span class="k">return</span> <span class="nb">sum</span> <span class="o">/</span> <span class="n">count</span> <span class="k">if</span> <span class="n">count</span> <span class="k">else</span> <span class="nb">float</span><span class="p">(</span><span class="s">'NaN'</span><span class="p">)</span> -<span class="n">average</span> <span class="o">=</span> <span class="n">pc</span> <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">CombineGlobally</span><span class="p">(</span><span class="n">AverageFn</span><span class="p">())</span> </code></pre> </div> @@ -915,9 +885,7 @@ guest, [[], [order4]] <div class="language-py highlighter-rouge"><pre class="highlight"><code><span class="c"># Flatten takes a tuple of PCollection objects.</span> <span class="c"># Returns a single PCollection that contains all of the elements in the </span> <span class="n">merged</span> <span class="o">=</span> <span class="p">(</span> - <span class="c"># [START model_multiple_pcollections_tuple]</span> <span class="p">(</span><span class="n">pcoll1</span><span class="p">,</span> <span class="n">pcoll2</span><span class="p">,</span> <span class="n">pcoll3</span><span class="p">)</span> - <span class="c"># [END model_multiple_pcollections_tuple]</span> <span class="c"># A list of tuples can be "piped" directly into a Flatten transform.</span> <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">Flatten</span><span class="p">())</span> @@ -1346,7 +1314,7 @@ guest, [[], [order4]] <h4 id="using-a-read-transform">Using a read transform:</h4> -<div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="n">PCollection</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">lines</span> <span class="o">=</span> <span class="n">p</span><span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="n">TextIO</span><span class="o">.</span><span class="na">Read</span><span class="o">.</span><span class="na">from</span><span class="o">(</span><span class="s">"gs://some/inputData.txt"</span><span class="o">));</span> +<div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="n">PCollection</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">lines</span> <span class="o">=</span> <span class="n">p</span><span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="n">TextIO</span><span class="o">.</span><span class="na">Read</span><span class="o">.</span><span class="na">from</span><span class="o">(</span><span class="s">"gs://some/inputData.txt"</span><span class="o">));</span> </code></pre> </div> @@ -1641,9 +1609,9 @@ guest, [[], [order4]] <h2 id="a-namewindowingaworking-with-windowing"><a name="windowing"></a>Working with windowing</h2> -<p>Windowing subdivides a <code class="highlighter-rouge">PCollection</code> according to the timestamps of its individual elements. Transforms that aggregate multiple elements, such as <code class="highlighter-rouge">GroupByKey</code> and <code class="highlighter-rouge">Combine</code>, work implicitly on a per-window basisâthat is, they process each <code class="highlighter-rouge">PCollection</code> as a succession of multiple, finite windows, though the entire collection itself may be of unbounded size.</p> +<p>Windowing subdivides a <code class="highlighter-rouge">PCollection</code> according to the timestamps of its individual elements. Transforms that aggregate multiple elements, such as <code class="highlighter-rouge">GroupByKey</code> and <code class="highlighter-rouge">Combine</code>, work implicitly on a per-window basis â they process each <code class="highlighter-rouge">PCollection</code> as a succession of multiple, finite windows, though the entire collection itself may be of unbounded size.</p> -<p>A related concept, called <strong>triggers</strong>, determines when to emit the results of aggregation as unbounded data arrives. Using a trigger can help to refine the windowing strategy for your <code class="highlighter-rouge">PCollection</code> to deal with late-arriving data or to provide early results. See the <a href="#triggers">triggers</a> section for more information.</p> +<p>A related concept, called <strong>triggers</strong>, determines when to emit the results of aggregation as unbounded data arrives. You can use triggers to refine the windowing strategy for your <code class="highlighter-rouge">PCollection</code>. Triggers allow you to deal with late-arriving data or to provide early results. See the <a href="#triggers">triggers</a> section for more information.</p> <h3 id="windowing-basics">Windowing basics</h3> @@ -1651,7 +1619,7 @@ guest, [[], [order4]] <p>In the Beam model, any <code class="highlighter-rouge">PCollection</code> (including unbounded <code class="highlighter-rouge">PCollection</code>s) can be subdivided into logical windows. Each element in a <code class="highlighter-rouge">PCollection</code> is assigned to one or more windows according to the <code class="highlighter-rouge">PCollection</code>âs windowing function, and each individual window contains a finite number of elements. Grouping transforms then consider each <code class="highlighter-rouge">PCollection</code>âs elements on a per-window basis. <code class="highlighter-rouge">GroupByKey</code>, for example, implicitly groups the elements of a <code class="highlighter-rouge">PCollection</code> by <em>key and window</em>.</p> -<p><strong>Caution:</strong> The default windowing behavior is to assign all elements of a <code class="highlighter-rouge">PCollection</code> to a single, global window, <em>even for unbounded <code class="highlighter-rouge">PCollection</code>s</em>. Before you use a grouping transform such as <code class="highlighter-rouge">GroupByKey</code> on an unbounded <code class="highlighter-rouge">PCollection</code>, you must do at least one of the following:</p> +<p><strong>Caution:</strong> Beamâs default windowing behavior is to assign all elements of a <code class="highlighter-rouge">PCollection</code> to a single, global window and discard late data, <em>even for unbounded <code class="highlighter-rouge">PCollection</code>s</em>. Before you use a grouping transform such as <code class="highlighter-rouge">GroupByKey</code> on an unbounded <code class="highlighter-rouge">PCollection</code>, you must do at least one of the following:</p> <ul> <li>Set a non-global windowing function. See <a href="#setwindowingfunction">Setting your PCollectionâs windowing function</a>.</li> <li>Set a non-default <a href="#triggers">trigger</a>. This allows the global window to emit results under other conditions, since the default windowing behavior (waiting for all data to arrive) will never occur.</li> @@ -1739,9 +1707,9 @@ Subsequent transforms, however, are applied to the result of the <code class="hi <h4 id="single-global-window">Single global window</h4> -<p>By default, all data in a <code class="highlighter-rouge">PCollection</code> is assigned to a single global window. If your data set is of a fixed size, you can leave the global window default for your <code class="highlighter-rouge">PCollection</code>.</p> +<p>By default, all data in a <code class="highlighter-rouge">PCollection</code> is assigned to a single global window, and late data is discarded. If your data set is of a fixed size, you can use the global window default for your <code class="highlighter-rouge">PCollection</code>.</p> -<p>You can use a single global window if you are working with an unbounded data set, e.g. from a streaming data source; however, use caution when applying aggregating transforms such as <code class="highlighter-rouge">GroupByKey</code> and <code class="highlighter-rouge">Combine</code>. A single global window with a default trigger generally requires the entire data set to be available before processing, which is not possible with continuously updating data. To perform aggregations on an unbounded <code class="highlighter-rouge">PCollection</code> that uses global windowing, you should specify a non-default trigger for that <code class="highlighter-rouge">PCollection</code>.</p> +<p>You can use a single global window if you are working with an unbounded data set (e.g. from a streaming data source) but use caution when applying aggregating transforms such as <code class="highlighter-rouge">GroupByKey</code> and <code class="highlighter-rouge">Combine</code>. A single global window with a default trigger generally requires the entire data set to be available before processing, which is not possible with continuously updating data. To perform aggregations on an unbounded <code class="highlighter-rouge">PCollection</code> that uses global windowing, you should specify a non-default trigger for that <code class="highlighter-rouge">PCollection</code>.</p> <h3 id="a-namesetwindowingfunctionasetting-your-pcollections-windowing-function"><a name="setwindowingfunction"></a>Setting your PCollectionâs windowing function</h3> @@ -1749,7 +1717,7 @@ Subsequent transforms, however, are applied to the result of the <code class="hi <p>Beam provides pre-defined <code class="highlighter-rouge">WindownFn</code>s for the basic windowing functions described here. You can also define your own <code class="highlighter-rouge">WindowFn</code> if you have a more complex need.</p> -<p>When setting a windowing function, you may also want to set a trigger for your <code class="highlighter-rouge">PCollection</code>. The trigger determines when each individual window is aggregated and emitted, and helps refine how the windowing function performs with respect to late data and computing early results. See the <a href="#triggers">triggers</a> section for more information.</p> +<p>When you set a windowing function, you may also want to set a trigger for your <code class="highlighter-rouge">PCollection</code>. The trigger determines when each individual window is aggregated and emitted, and helps refine how the windowing function performs with respect to late data and computing early results. See the <a href="#triggers">triggers</a> section for more information.</p> <h4 id="setting-fixed-time-windows">Setting fixed-time windows</h4> @@ -1814,7 +1782,7 @@ Subsequent transforms, however, are applied to the result of the <code class="hi </code></pre> </div> -<h3 id="time-skew-data-lag-and-late-data">Time skew, data lag, and late data</h3> +<h3 id="a-namewatermarks-late-dataawatermarks-and-late-data"><a name="watermarks-late-data"></a>Watermarks and late data</h3> <p>In any data processing system, there is a certain amount of lag between the time a data event occurs (the âevent timeâ, determined by the timestamp on the data element itself) and the time the actual data element gets processed at any stage in your pipeline (the âprocessing timeâ, determined by the clock on the system processing the element). In addition, there are no guarantees that data events will appear in your pipeline in the same order that they were generated.</p> @@ -1824,12 +1792,14 @@ Subsequent transforms, however, are applied to the result of the <code class="hi <p>From our example, suppose we have a simple watermark that assumes approximately 30s of lag time between the data timestamps (the event time) and the time the data appears in the pipeline (the processing time), then Beam would close the first window at 5:30. If a data record arrives at 5:34, but with a timestamp that would put it in the 0:00-4:59 window (say, 3:38), then that record is late data.</p> -<p>Note: For simplicity, weâve assumed that weâre using a very straightforward watermark that estimates the lag time/time skew. In practice, your <code class="highlighter-rouge">PCollection</code>âs data source determines the watermark, and watermarks can be more precise or complex.</p> +<p>Note: For simplicity, weâve assumed that weâre using a very straightforward watermark that estimates the lag time. In practice, your <code class="highlighter-rouge">PCollection</code>âs data source determines the watermark, and watermarks can be more precise or complex.</p> + +<p>Beamâs default windowing configuration tries to determines when all data has arrived (based on the type of data source) and then advances the watermark past the end of the window. This default configuration does <em>not</em> allow late data. <a href="#triggers">Triggers</a> allow you to modify and refine the windowing strategy for a <code class="highlighter-rouge">PCollection</code>. You can use triggers to decide when each individual window aggregates and reports its results, including how the window emits late elements.</p> -<h4 id="managing-time-skew-and-late-data">Managing time skew and late data</h4> +<h4 id="managing-late-data">Managing late data</h4> <blockquote> - <p><strong>Note:</strong> Managing time skew and late data is not supported in the Beam SDK for Python.</p> + <p><strong>Note:</strong> Managing late data is not supported in the Beam SDK for Python.</p> </blockquote> <p>You can allow late data by invoking the <code class="highlighter-rouge">.withAllowedLateness</code> operation when you set your <code class="highlighter-rouge">PCollection</code>âs windowing strategy. The following code example demonstrates a windowing strategy that will allow late data up to two days after the end of a window.</p> @@ -1842,8 +1812,6 @@ Subsequent transforms, however, are applied to the result of the <code class="hi <p>When you set <code class="highlighter-rouge">.withAllowedLateness</code> on a <code class="highlighter-rouge">PCollection</code>, that allowed lateness propagates forward to any subsequent <code class="highlighter-rouge">PCollection</code> derived from the first <code class="highlighter-rouge">PCollection</code> you applied allowed lateness to. If you want to change the allowed lateness later in your pipeline, you must do so explictly by applying <code class="highlighter-rouge">Window.withAllowedLateness()</code> again.</p> -<p>You can also use triggers to help you refine the windowing strategy for a <code class="highlighter-rouge">PCollection</code>. You can use triggers to determine exactly when each individual window aggregates and reports its results, including how the window emits late elements.</p> - <h3 id="adding-timestamps-to-a-pcollections-elements">Adding timestamps to a PCollectionâs elements</h3> <p>An unbounded source provides a timestamp for each element. Depending on your unbounded source, you may need to configure how the timestamp is extracted from the raw data stream.</p> @@ -1885,9 +1853,204 @@ Subsequent transforms, however, are applied to the result of the <code class="hi <h2 id="a-nametriggersaworking-with-triggers"><a name="triggers"></a>Working with triggers</h2> <blockquote> - <p><strong>Note:</strong> This guide is still in progress. There is an open issue to finish the guide (<a href="https://issues.apache.org/jira/browse/BEAM-193">BEAM-193</a>)</p> + <p><strong>NOTE:</strong> This content applies only to the Beam SDK for Java. The Beam SDK for Python does not support triggers.</p> </blockquote> +<p>When collecting and grouping data into windows, Beam uses <strong>triggers</strong> to determine when to emit the aggregated results of each window (referred to as a <em>pane</em>). If you use Beamâs default windowing configuration and <a href="#default-trigger">default trigger</a>, Beam outputs the aggregated result when it <a href="#watermarks-late-data">estimates all data has arrived</a>, and discards all subsequent data for that window.</p> + +<p>You can set triggers for your <code class="highlighter-rouge">PCollection</code>s to change this default behavior. Beam provides a number of pre-built triggers that you can set:</p> + +<ul> + <li><strong>Event time triggers</strong>. These triggers operate on the event time, as indicated by the timestamp on each data element. Beamâs default trigger is event time-based.</li> + <li><strong>Processing time triggers</strong>. These triggers operate on the processing time â the time when the data element is processed at any given stage in the pipeline.</li> + <li><strong>Data-driven triggers</strong>. These triggers operate by examining the data as it arrives in each window, and firing when that data meets a certain property. Currently, data-driven triggers only support firing after a certain number of data elements.</li> + <li><strong>Composite triggers</strong>. These triggers combine multiple triggers in various ways.</li> +</ul> + +<p>At a high level, triggers provide two additional capabilities compared to simply outputting at the end of a window:</p> + +<ul> + <li>Triggers allow Beam to emit early results, before all the data in a given window has arrived. For example, emitting after a certain amouint of time elapses, or after a certain number of elements arrives.</li> + <li>Triggers allow processing of late data by triggering after the event time watermark passes the end of the window.</li> +</ul> + +<p>These capabilities allow you to control the flow of your data and balance between different factors depending on your use case:</p> + +<ul> + <li><strong>Completeness:</strong> How important is it to have all of your data before you compute your result?</li> + <li><strong>Latency:</strong> How long do you want to wait for data? For example, do you wait until you think you have all data? Do you process data as it arrives?</li> + <li><strong>Cost:</strong> How much compute power/money are you willing to spend to lower the latency?</li> +</ul> + +<p>For example, a system that requires time-sensitive updates might use a strict time-based trigger that emits a window every <em>N</em> seconds, valuing promptness over data completeness. A system that values data completeness more than the exact timing of results might choose to use Beamâs default trigger, which fires at the end of the window.</p> + +<p>You can also set a trigger for an unbounded <code class="highlighter-rouge">PCollection</code> that uses a <a href="#windowing">single global window for its windowing function</a>. This can be useful when you want your pipeline to provide periodic updates on an unbounded data set â for example, a running average of all data provided to the present time, updated every N seconds or every N elements.</p> + +<h4 id="event-time-triggers">Event Time Triggers</h4> + +<p>The <code class="highlighter-rouge">AfterWatermark</code> trigger operates on <em>event time</em>. The <code class="highlighter-rouge">AfterWatermark</code> trigger emits the contents of a window after the <a href="#watermarks-late-data">watermark</a> passes the end of the window, based on the timestamps attached to the data elements. The watermark is a global progress metric, and is Beamâs notion of input completeness within your pipeline at any given point. <code class="highlighter-rouge">AfterWatermark.pastEndOfWindow()</code> <em>only</em> fires when the watermark passes the end of the window.</p> + +<p>In addition, you can use <code class="highlighter-rouge">.withEarlyFirings(trigger)</code> and <code class="highlighter-rouge">.withLateFirings(trigger)</code> to configure triggers that fire if your pipeline receives data before or after the end of the window.</p> + +<p>The following example shows a billing scenario, and uses both early and late firings:</p> +<div class="language-java highlighter-rouge"><pre class="highlight"><code> <span class="c1">// Create a bill at the end of the month.</span> + <span class="n">AfterWatermark</span><span class="o">.</span><span class="na">pastEndOfWindow</span><span class="o">()</span> + <span class="c1">// During the month, get near real-time estimates.</span> + <span class="o">.</span><span class="na">withEarlyFirings</span><span class="o">(</span> + <span class="n">AfterProcessingTime</span> + <span class="o">.</span><span class="na">pastFirstElementInPane</span><span class="o">()</span> + <span class="o">.</span><span class="na">plusDuration</span><span class="o">(</span><span class="n">Duration</span><span class="o">.</span><span class="na">standardMinutes</span><span class="o">(</span><span class="mi">1</span><span class="o">))</span> + <span class="c1">// Fire on any late data so the bill can be corrected.</span> + <span class="o">.</span><span class="na">withLateFirings</span><span class="o">(</span><span class="n">AfterPane</span><span class="o">.</span><span class="na">elementCountAtLeast</span><span class="o">(</span><span class="mi">1</span><span class="o">))</span> +</code></pre> +</div> + +<div class="language-py highlighter-rouge"><pre class="highlight"><code> <span class="n">The</span> <span class="n">Beam</span> <span class="n">SDK</span> <span class="k">for</span> <span class="n">Python</span> <span class="n">does</span> <span class="ow">not</span> <span class="n">support</span> <span class="n">triggers</span><span class="o">.</span> +</code></pre> +</div> + +<h5 id="a-namedefault-triggeradefault-trigger"><a name="default-trigger"></a><strong>Default Trigger</strong></h5> + +<p>The default trigger for a <code class="highlighter-rouge">PCollection</code> is based on event time, and emits the results of the window when the Beamâs watermark passes the end of the window, and then fires each time late data arrives.</p> + +<p>However, if you are using both the default windowing configuration and the default trigger, the default trigger emits exactly once, and late data is discarded. This is because the default windowing configuration has an allowed lateness value of 0. See the Handling Late Data section for information about modifying this behavior.</p> + +<h4 id="processing-time-triggers">Processing Time Triggers</h4> + +<p>The <code class="highlighter-rouge">AfterProcessingTime</code> trigger operates on <em>processing time</em>. For example, the <code class="highlighter-rouge">AfterProcessingTime.pastFirstElementInPane() </code> trigger emits a window after a certain amount of processing time has passed since data was received. The processing time is determined by the system clock, rather than the data elementâs timestamp.</p> + +<p>The <code class="highlighter-rouge">AfterProcessingTime</code> trigger is useful for triggering early results from a window, particularly a window with a large time frame such as a single global window.</p> + +<h4 id="data-driven-triggers">Data-Driven Triggers</h4> + +<p>Beam provides one data-driven trigger, <code class="highlighter-rouge">AfterPane.elementCountAtLeast()</code>. This trigger works on an element count; it fires after the current pane has collected at least <em>N</em> elements. This allows a window to emit early results (before all the data has accumulated), which can be particularly useful if you are using a single global window.</p> + +<p>It is important to note that if, for example, you use <code class="highlighter-rouge">.elementCountAtLeast(50)</code> and only 32 elements arrive, those 32 elements sit around forever. If the 32 elements are important to you, consider using <a href="#composite-triggers">composite triggers</a> to combine multiple conditions. This allows you to specify multiple firing conditions such as âfire either when I receive 50 elements, or every 1 secondâ.</p> + +<h3 id="setting-a-trigger">Setting a Trigger</h3> + +<p>When you set a windowing function for a <code class="highlighter-rouge">PCollection</code> by using the <code class="highlighter-rouge">Window</code> transform, you can also specify a trigger.</p> + +<p>You set the trigger(s) for a <code class="highlighter-rouge">PCollection</code> by invoking the method <code class="highlighter-rouge">.triggering()</code> on the result of your <code class="highlighter-rouge">Window.into()</code> transform, as follows:</p> + +<div class="language-java highlighter-rouge"><pre class="highlight"><code> <span class="n">PCollection</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">pc</span> <span class="o">=</span> <span class="o">...;</span> + <span class="n">pc</span><span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="n">Window</span><span class="o"><</span><span class="n">String</span><span class="o">>.</span><span class="na">into</span><span class="o">(</span><span class="n">FixedWindows</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="mi">1</span><span class="o">,</span> <span class="n">TimeUnit</span><span class="o">.</span><span class="na">MINUTES</span><span class="o">))</span> + <span class="o">.</span><span class="na">triggering</span><span class="o">(</span><span class="n">AfterProcessingTime</span><span class="o">.</span><span class="na">pastFirstElementInPane</span><span class="o">()</span> + <span class="o">.</span><span class="na">plusDelayOf</span><span class="o">(</span><span class="n">Duration</span><span class="o">.</span><span class="na">standardMinutes</span><span class="o">(</span><span class="mi">1</span><span class="o">)))</span> + <span class="o">.</span><span class="na">discardingFiredPanes</span><span class="o">());</span> +</code></pre> +</div> + +<p>This code sample sets a time-based trigger for a <code class="highlighter-rouge">PCollection</code>, which emits results one minute after the first element in that window has been processed. The last line in the code sample, <code class="highlighter-rouge">.discardingFiredPanes()</code>, is the windowâs <strong>accumulation mode</strong>.</p> + +<h4 id="window-accumulation-modes">Window Accumulation Modes</h4> + +<p>When you specify a trigger, you must also set the the windowâs <strong>accumulation mode</strong>. When a trigger fires, it emits the current contents of the window as a pane. Since a trigger can fire multiple times, the accumulation mode determines whether the system <em>accumulates</em> the window panes as the trigger fires, or <em>discards</em> them.</p> + +<p>To set a window to accumulate the panes that are produced when the trigger fires, invoke<code class="highlighter-rouge">.accumulatingFiredPanes()</code> when you set the trigger. To set a window to discard fired panes, invoke <code class="highlighter-rouge">.discardingFiredPanes()</code>.</p> + +<p>Letâs look an example that uses a <code class="highlighter-rouge">PCollection</code> with fixed-time windowing and a data-based trigger. This is something you might do if, for example, each window represented a ten-minute running average, but you wanted to display the current value of the average in a UI more frequently than every ten minutes. Weâll assume the following conditions:</p> + +<ul> + <li>The <code class="highlighter-rouge">PCollection</code> uses 10-minute fixed-time windows.</li> + <li>The <code class="highlighter-rouge">PCollection</code> has a repeating trigger that fires every time 3 elements arrive.</li> +</ul> + +<p>The following diagram shows data events for key X as they arrive in the PCollection and are assigned to windows. To keep the diagram a bit simpler, weâll assume that the events all arrive in the pipeline in order.</p> + +<p><img src="/images/trigger-accumulation.png" alt="Diagram of data events for acculumating mode example" title="Data events for accumulating mode example" /></p> + +<h5 id="accumulating-mode">Accumulating Mode</h5> + +<p>If our trigger is set to <code class="highlighter-rouge">.accumulatingFiredPanes</code>, the trigger emits the following values each time it fires. Keep in mind that the trigger fires every time three elements arrive:</p> + +<div class="highlighter-rouge"><pre class="highlight"><code> First trigger firing: [5, 8, 3] + Second trigger firing: [5, 8, 3, 15, 19, 23] + Third trigger firing: [5, 8, 3, 15, 19, 23, 9, 13, 10] +</code></pre> +</div> + +<h5 id="discarding-mode">Discarding Mode</h5> + +<p>If our trigger is set to <code class="highlighter-rouge">.discardingFiredPanes</code>, the trigger emits the following values on each firing:</p> + +<div class="highlighter-rouge"><pre class="highlight"><code> First trigger firing: [5, 8, 3] + Second trigger firing: [15, 19, 23] + Third trigger firing: [9, 13, 10] +</code></pre> +</div> + +<h4 id="handling-late-data">Handling Late Data</h4> + +<p>If you want your pipeline to process data that arrives after the watermark passes the end of the window, you can apply an <em>allowed lateness</em> when you set your windowing configuration. This gives your trigger the opportunity to react to the late data. If allowed lateness is set, the default trigger will emit new results immediately whenever late data arrives.</p> + +<p>You set the allowed lateness by using <code class="highlighter-rouge">.withAllowedLateness()</code> when you set your windowing function:</p> + +<div class="language-java highlighter-rouge"><pre class="highlight"><code> <span class="n">PCollection</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">pc</span> <span class="o">=</span> <span class="o">...;</span> + <span class="n">pc</span><span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="n">Window</span><span class="o"><</span><span class="n">String</span><span class="o">>.</span><span class="na">into</span><span class="o">(</span><span class="n">FixedWindows</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="mi">1</span><span class="o">,</span> <span class="n">TimeUnit</span><span class="o">.</span><span class="na">MINUTES</span><span class="o">))</span> + <span class="o">.</span><span class="na">triggering</span><span class="o">(</span><span class="n">AfterProcessingTime</span><span class="o">.</span><span class="na">pastFirstElementInPane</span><span class="o">()</span> + <span class="o">.</span><span class="na">plusDelayOf</span><span class="o">(</span><span class="n">Duration</span><span class="o">.</span><span class="na">standardMinutes</span><span class="o">(</span><span class="mi">1</span><span class="o">)))</span> + <span class="o">.</span><span class="na">withAllowedLateness</span><span class="o">(</span><span class="n">Duration</span><span class="o">.</span><span class="na">standardMinutes</span><span class="o">(</span><span class="mi">30</span><span class="o">));</span> +</code></pre> +</div> + +<p>This allowed lateness propagates to all <code class="highlighter-rouge">PCollection</code>s derived as a result of applying transforms to the original <code class="highlighter-rouge">PCollection</code>. If you want to change the allowed lateness later in your pipeline, you can apply <code class="highlighter-rouge">Window.withAllowedLateness()</code> again, explicitly.</p> + +<h3 id="a-namecomposite-triggersacomposite-triggers"><a name="composite-triggers"></a>Composite Triggers</h3> + +<p>You can combine multiple triggers to form <strong>composite triggers</strong>, and can specify a trigger to emit results repeatedly, at most once, or under other custom conditions.</p> + +<h4 id="composite-trigger-types">Composite Trigger Types</h4> + +<p>Beam includes the following composite triggers:</p> + +<ul> + <li>You can add additional early firings or late firings to <code class="highlighter-rouge">AfterWatermark.pastEndOfWindow</code> via <code class="highlighter-rouge">.withEarlyFirings</code> and <code class="highlighter-rouge">.withLateFirings</code>.</li> + <li><code class="highlighter-rouge">Repeatedly.forever</code> specifies a trigger that executes forever. Any time the triggerâs conditions are met, it causes a window to emit results and then resets and starts over. It can be useful to combine <code class="highlighter-rouge">Repeatedly.forever</code> with <code class="highlighter-rouge">.orFinally</code> to specify a condition that causes the repeating trigger to stop.</li> + <li><code class="highlighter-rouge">AfterEach.inOrder</code> combines multiple triggers to fire in a specific sequence. Each time a trigger in the sequence emits a window, the sequence advances to the next trigger.</li> + <li><code class="highlighter-rouge">AfterFirst</code> takes multiple triggers and emits the first time <em>any</em> of its argument triggers is satisfied. This is equivalent to a logical OR operation for multiple triggers.</li> + <li><code class="highlighter-rouge">AfterAll</code> takes multiple triggers and emits when <em>all</em> of its argument triggers are satisfied. This is equivalent to a logical AND operation for multiple triggers.</li> + <li><code class="highlighter-rouge">orFinally</code> can serve as a final condition to cause any trigger to fire one final time and never fire again.</li> +</ul> + +<h4 id="composition-with-afterwatermarkpastendofwindow">Composition with AfterWatermark.pastEndOfWindow</h4> + +<p>Some of the most useful composite triggers fire a single time when Beam estimates that all the data has arrived (i.e. when the watermark passes the end of the window) combined with either, or both, of the following:</p> + +<ul> + <li>Speculative firings that precede the watermark passing the end of the window to allow faster processing of partial results.</li> + <li>Late firings that happen after the watermark passes the end of the window, to allow for handling late-arriving data</li> +</ul> + +<p>You can express this pattern using <code class="highlighter-rouge">AfterWatermark.pastEndOfWindow</code>. For example, the following example trigger code fires on the following conditions:</p> + +<ul> + <li>On Beamâs estimate that all the data has arrived (the watermark passes the end of the window)</li> + <li>Any time late data arrives, after a ten-minute delay</li> + <li>After two days, we assume no more data of interest will arrive, and the trigger stops executing</li> +</ul> + +<div class="language-java highlighter-rouge"><pre class="highlight"><code> <span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="n">Window</span> + <span class="o">.</span><span class="na">triggering</span><span class="o">(</span><span class="n">AfterWatermark</span> + <span class="o">.</span><span class="na">pastEndOfWindow</span><span class="o">()</span> + <span class="o">.</span><span class="na">withLateFirings</span><span class="o">(</span><span class="n">AfterProcessingTime</span> + <span class="o">.</span><span class="na">pastFirstElementInPane</span><span class="o">()</span> + <span class="o">.</span><span class="na">plusDelayOf</span><span class="o">(</span><span class="n">Duration</span><span class="o">.</span><span class="na">standardMinutes</span><span class="o">(</span><span class="mi">10</span><span class="o">))))</span> + <span class="o">.</span><span class="na">withAllowedLateness</span><span class="o">(</span><span class="n">Duration</span><span class="o">.</span><span class="na">standardDays</span><span class="o">(</span><span class="mi">2</span><span class="o">)));</span> +</code></pre> +</div> + +<h4 id="other-composite-triggers">Other Composite Triggers</h4> + +<p>You can also build other sorts of composite triggers. The following example code shows a simple composite trigger that fires whenever the pane has at least 100 elements, or after a minute.</p> + +<div class="language-java highlighter-rouge"><pre class="highlight"><code> <span class="n">Repeatedly</span><span class="o">.</span><span class="na">forever</span><span class="o">(</span><span class="n">AfterFirst</span><span class="o">.</span><span class="na">of</span><span class="o">(</span> + <span class="n">AfterPane</span><span class="o">.</span><span class="na">elementCountAtLeast</span><span class="o">(</span><span class="mi">100</span><span class="o">),</span> + <span class="n">AfterProcessingTime</span><span class="o">.</span><span class="na">pastFirstElementInPane</span><span class="o">().</span><span class="na">plusDelayOf</span><span class="o">(</span><span class="n">Duration</span><span class="o">.</span><span class="na">standardMinutes</span><span class="o">(</span><span class="mi">1</span><span class="o">))))</span> +</code></pre> +</div> + + </div> http://git-wip-us.apache.org/repos/asf/beam-site/blob/a163bcf4/content/images/trigger-accumulation.png ---------------------------------------------------------------------- diff --git a/content/images/trigger-accumulation.png b/content/images/trigger-accumulation.png new file mode 100644 index 0000000..8c0dca9 Binary files /dev/null and b/content/images/trigger-accumulation.png differ