Regenerate website

Project: http://git-wip-us.apache.org/repos/asf/beam-site/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam-site/commit/fa7d6168
Tree: http://git-wip-us.apache.org/repos/asf/beam-site/tree/fa7d6168
Diff: http://git-wip-us.apache.org/repos/asf/beam-site/diff/fa7d6168

Branch: refs/heads/asf-site
Commit: fa7d61680afb67248c2b9ae55417bc0ce7148c9b
Parents: ddb6079
Author: Davor Bonaci <da...@google.com>
Authored: Mon Mar 20 14:56:42 2017 -0700
Committer: Davor Bonaci <da...@google.com>
Committed: Mon Mar 20 14:56:42 2017 -0700

----------------------------------------------------------------------
 .../documentation/programming-guide/index.html  | 248 ++++++++++++++++++-
 content/images/fixed-time-windows.png           | Bin 0 -> 11717 bytes
 content/images/session-windows.png              | Bin 0 -> 16697 bytes
 content/images/sliding-time-windows.png         | Bin 0 -> 16537 bytes
 content/images/unwindowed-pipeline-bounded.png  | Bin 0 -> 9589 bytes
 content/images/windowing-pipeline-bounded.png   | Bin 0 -> 13325 bytes
 content/images/windowing-pipeline-unbounded.png | Bin 0 -> 21890 bytes
 7 files changed, 245 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam-site/blob/fa7d6168/content/documentation/programming-guide/index.html
----------------------------------------------------------------------
diff --git a/content/documentation/programming-guide/index.html 
b/content/documentation/programming-guide/index.html
index 19853df..9d0a3b6 100644
--- a/content/documentation/programming-guide/index.html
+++ b/content/documentation/programming-guide/index.html
@@ -369,7 +369,7 @@
 
 <p>The bounded (or unbounded) nature of your <code 
class="highlighter-rouge">PCollection</code> affects how Beam processes your 
data. A bounded <code class="highlighter-rouge">PCollection</code> can be 
processed using a batch job, which might read the entire data set once, and 
perform processing in a job of finite length. An unbounded <code 
class="highlighter-rouge">PCollection</code> must be processed using a 
streaming job that runs continuously, as the entire collection can never be 
available for processing at any one time.</p>
 
-<p>When performing an operation that groups elements in an unbounded <code 
class="highlighter-rouge">PCollection</code>, Beam requires a concept called 
<strong>Windowing</strong> to divide a continuously updating data set into 
logical windows of finite size.  Beam processes each window as a bundle, and 
processing continues as the data set is generated. These logical windows are 
determined by some characteristic associated with a data element, such as a 
<strong>timestamp</strong>.</p>
+<p>When performing an operation that groups elements in an unbounded <code 
class="highlighter-rouge">PCollection</code>, Beam requires a concept called 
<strong>windowing</strong> to divide a continuously updating data set into 
logical windows of finite size.  Beam processes each window as a bundle, and 
processing continues as the data set is generated. These logical windows are 
determined by some characteristic associated with a data element, such as a 
<strong>timestamp</strong>.</p>
 
 <h4 id="a-namepctimestampsaelement-timestamps"><a 
name="pctimestamps"></a>Element timestamps</h4>
 
@@ -1522,8 +1522,250 @@ tree, [2]
 
 <p class="language-py">The Beam SDK for Python does not support annotating 
data types with a default coder. If you would like to set a default coder, use 
the method described in the previous section, <em>Setting the default coder for 
a type</em>.</p>
 
-<p><a name="windowing"></a>
-<a name="triggers"></a></p>
+<h2 id="a-namewindowingaworking-with-windowing"><a 
name="windowing"></a>Working with windowing</h2>
+
+<p>Windowing subdivides a <code class="highlighter-rouge">PCollection</code> 
according to the timestamps of its individual elements. Transforms that 
aggregate multiple elements, such as <code 
class="highlighter-rouge">GroupByKey</code> and <code 
class="highlighter-rouge">Combine</code>, work implicitly on a per-window 
basis—that is, they process each <code 
class="highlighter-rouge">PCollection</code> as a succession of multiple, 
finite windows, though the entire collection itself may be of unbounded 
size.</p>
+
+<p>A related concept, called <strong>triggers</strong>, determines when to 
emit the results of aggregation as unbounded data arrives. Using a trigger can 
help to refine the windowing strategy for your <code 
class="highlighter-rouge">PCollection</code> to deal with late-arriving data or 
to provide early results. See the <a href="#triggers">triggers</a> section for 
more information.</p>
+
+<h3 id="windowing-basics">Windowing basics</h3>
+
+<p>Some Beam transforms, such as <code 
class="highlighter-rouge">GroupByKey</code> and <code 
class="highlighter-rouge">Combine</code>, group multiple elements by a common 
key. Ordinarily, that grouping operation groups all of the elements that have 
the same key within the entire data set. With an unbounded data set, it is 
impossible to collect all of the elements, since new elements are constantly 
being added and may be infinitely many (e.g. streaming data). If you are 
working with unbounded <code class="highlighter-rouge">PCollection</code>s, 
windowing is especially useful.</p>
+
+<p>In the Beam model, any <code class="highlighter-rouge">PCollection</code> 
(including unbounded <code class="highlighter-rouge">PCollection</code>s) can 
be subdivided into logical windows. Each element in a <code 
class="highlighter-rouge">PCollection</code> is assigned to one or more windows 
according to the <code class="highlighter-rouge">PCollection</code>’s 
windowing function, and each individual window contains a finite number of 
elements. Grouping transforms then consider each <code 
class="highlighter-rouge">PCollection</code>’s elements on a per-window 
basis. <code class="highlighter-rouge">GroupByKey</code>, for example, 
implicitly groups the elements of a <code 
class="highlighter-rouge">PCollection</code> by <em>key and window</em>.</p>
+
+<p><strong>Caution:</strong> The default windowing behavior is to assign all 
elements of a <code class="highlighter-rouge">PCollection</code> to a single, 
global window, <em>even for unbounded <code 
class="highlighter-rouge">PCollection</code>s</em>. Before you use a grouping 
transform such as <code class="highlighter-rouge">GroupByKey</code> on an 
unbounded <code class="highlighter-rouge">PCollection</code>, you must do at 
least one of the following:</p>
+<ul>
+  <li>Set a non-global windowing function. See <a 
href="#setwindowingfunction">Setting your PCollection’s windowing 
function</a>.</li>
+  <li>Set a non-default <a href="#triggers">trigger</a>. This allows the 
global window to emit results under other conditions, since the default 
windowing behavior (waiting for all data to arrive) will never occur.</li>
+</ul>
+
+<p>If you don’t set a non-global windowing function or a non-default trigger 
for your unbounded <code class="highlighter-rouge">PCollection</code> and 
subsequently use a grouping transform such as <code 
class="highlighter-rouge">GroupByKey</code> or <code 
class="highlighter-rouge">Combine</code>, your pipeline will generate an error 
upon construction and your job will fail.</p>
+
+<h4 id="windowing-constraints">Windowing constraints</h4>
+
+<p>After you set the windowing function for a <code 
class="highlighter-rouge">PCollection</code>, the elements’ windows are used 
the next time you apply a grouping transform to that <code 
class="highlighter-rouge">PCollection</code>. Window grouping occurs on an 
as-needed basis. If you set a windowing function using the <code 
class="highlighter-rouge">Window</code> transform, each element is assigned to 
a window, but the windows are not considered until <code 
class="highlighter-rouge">GroupByKey</code> or <code 
class="highlighter-rouge">Combine</code> aggregates across a window and key. 
This can have different effects on your pipeline.
+Consider the example pipeline in the figure below:</p>
+
+<p><img src="/images/windowing-pipeline-unbounded.png" alt="Diagram of 
pipeline applying windowing" title="Pipeline applying windowing" /></p>
+
+<p><strong>Figure:</strong> Pipeline applying windowing</p>
+
+<p>In the above pipeline, we create an unbounded <code 
class="highlighter-rouge">PCollection</code> by reading a set of key/value 
pairs using <code class="highlighter-rouge">KafkaIO</code>, and then apply a 
windowing function to that collection using the <code 
class="highlighter-rouge">Window</code> transform. We then apply a <code 
class="highlighter-rouge">ParDo</code> to the the collection, and then later 
group the result of that <code class="highlighter-rouge">ParDo</code> using 
<code class="highlighter-rouge">GroupByKey</code>. The windowing function has 
no effect on the <code class="highlighter-rouge">ParDo</code> transform, 
because the windows are not actually used until they’re needed for the <code 
class="highlighter-rouge">GroupByKey</code>.
+Subsequent transforms, however, are applied to the result of the <code 
class="highlighter-rouge">GroupByKey</code> – data is grouped by both key and 
window.</p>
+
+<h4 id="using-windowing-with-bounded-pcollections">Using windowing with 
bounded PCollections</h4>
+
+<p>You can use windowing with fixed-size data sets in <strong>bounded</strong> 
<code class="highlighter-rouge">PCollection</code>s. However, note that 
windowing considers only the implicit timestamps attached to each element of a 
<code class="highlighter-rouge">PCollection</code>, and data sources that 
create fixed data sets (such as <code class="highlighter-rouge">TextIO</code>) 
assign the same timestamp to every element. This means that all the elements 
are by default part of a single, global window.</p>
+
+<p>To use windowing with fixed data sets, you can assign your own timestamps 
to each element. To assign timestamps to elements, use a <code 
class="highlighter-rouge">ParDo</code> transform with a <code 
class="highlighter-rouge">DoFn</code> that outputs each element with a new 
timestamp (for example, the <a 
href="/documentation/sdks/javadoc/0.6.0/index.html?org/apache/beam/sdk/transforms/WithTimestamps.html">WithTimestamps</a>
 transform in the Beam SDK for Java).</p>
+
+<p>To illustrate how windowing with a bounded <code 
class="highlighter-rouge">PCollection</code> can affect how your pipeline 
processes data, consider the following pipeline:</p>
+
+<p><img src="/images/unwindowed-pipeline-bounded.png" alt="Diagram of 
GroupByKey and ParDo without windowing, on a bounded collection" 
title="GroupByKey and ParDo without windowing, on a bounded collection" /></p>
+
+<p><strong>Figure:</strong> <code class="highlighter-rouge">GroupByKey</code> 
and <code class="highlighter-rouge">ParDo</code> without windowing, on a 
bounded collection.</p>
+
+<p>In the above pipeline, we create a bounded <code 
class="highlighter-rouge">PCollection</code> by reading a set of key/value 
pairs using <code class="highlighter-rouge">TextIO</code>. We then group the 
collection using <code class="highlighter-rouge">GroupByKey</code>, and apply a 
<code class="highlighter-rouge">ParDo</code> transform to the grouped <code 
class="highlighter-rouge">PCollection</code>. In this example, the <code 
class="highlighter-rouge">GroupByKey</code> creates a collection of unique 
keys, and then <code class="highlighter-rouge">ParDo</code> gets applied 
exactly once per key.</p>
+
+<p>Note that even if you don’t set a windowing function, there is still a 
window – all elements in your <code 
class="highlighter-rouge">PCollection</code> are assigned to a single global 
window.</p>
+
+<p>Now, consider the same pipeline, but using a windowing function:</p>
+
+<p><img src="/images/windowing-pipeline-bounded.png" alt="Diagram of 
GroupByKey and ParDo with windowing, on a bounded collection" title="GroupByKey 
and ParDo with windowing, on a bounded collection" /></p>
+
+<p><strong>Figure:</strong> <code class="highlighter-rouge">GroupByKey</code> 
and <code class="highlighter-rouge">ParDo</code> with windowing, on a bounded 
collection.</p>
+
+<p>As before, the pipeline creates a bounded <code 
class="highlighter-rouge">PCollection</code> of key/value pairs. We then set a 
<a href="#setwindowingfunction">windowing function</a> for that <code 
class="highlighter-rouge">PCollection</code>. The <code 
class="highlighter-rouge">GroupByKey</code> transform groups the elements of 
the <code class="highlighter-rouge">PCollection</code> by both key and window, 
based on the windowing function. The subsequent <code 
class="highlighter-rouge">ParDo</code> transform gets applied multiple times 
per key, once for each window.</p>
+
+<h3 id="windowing-functions">Windowing functions</h3>
+
+<p>You can define different kinds of windows to divide the elements of your 
<code class="highlighter-rouge">PCollection</code>. Beam provides several 
windowing functions, including:</p>
+
+<ul>
+  <li>Fixed Time Windows</li>
+  <li>Sliding Time Windows</li>
+  <li>Per-Session Windows</li>
+  <li>Single Global Window</li>
+  <li>Calendar-based Windows (not supported by the Beam SDK for Python)</li>
+</ul>
+
+<p>Note that each element can logically belong to more than one window, 
depending on the windowing function you use. Sliding time windowing, for 
example, creates overlapping windows wherein a single element can be assigned 
to multiple windows.</p>
+
+<h4 id="fixed-time-windows">Fixed time windows</h4>
+
+<p>The simplest form of windowing is using <strong>fixed time 
windows</strong>: given a timestamped <code 
class="highlighter-rouge">PCollection</code> which might be continuously 
updating, each window might capture (for example) all elements with timestamps 
that fall into a five minute interval.</p>
+
+<p>A fixed time window represents a consistent duration, non overlapping time 
interval in the data stream. Consider windows with a five-minute duration: all 
of the elements in your unbounded <code 
class="highlighter-rouge">PCollection</code> with timestamp values from 0:00:00 
up to (but not including) 0:05:00 belong to the first window, elements with 
timestamp values from 0:05:00 up to (but not including) 0:10:00 belong to the 
second window, and so on.</p>
+
+<p><img src="/images/fixed-time-windows.png" alt="Diagram of fixed time 
windows, 30s in duration" title="Fixed time windows, 30s in duration" /></p>
+
+<p><strong>Figure:</strong> Fixed time windows, 30s in duration.</p>
+
+<h4 id="sliding-time-windows">Sliding time windows</h4>
+
+<p>A <strong>sliding time window</strong> also represents time intervals in 
the data stream; however, sliding time windows can overlap. For example, each 
window might capture five minutes worth of data, but a new window starts every 
ten seconds. The frequency with which sliding windows begin is called the 
<em>period</em>. Therefore, our example would have a window <em>duration</em> 
of five minutes and a <em>period</em> of ten seconds.</p>
+
+<p>Because multiple windows overlap, most elements in a data set will belong 
to more than one window. This kind of windowing is useful for taking running 
averages of data; using sliding time windows, you can compute a running average 
of the past five minutes’ worth of data, updated every ten seconds, in our 
example.</p>
+
+<p><img src="/images/sliding-time-windows.png" alt="Diagram of sliding time 
windows, with 1 minute window duration and 30s window period" title="Sliding 
time windows, with 1 minute window duration and 30s window period" /></p>
+
+<p><strong>Figure:</strong> Sliding time windows, with 1 minute window 
duration and 30s window period.</p>
+
+<h4 id="session-windows">Session windows</h4>
+
+<p>A <strong>session window</strong> function defines windows that contain 
elements that are within a certain gap duration of another element. Session 
windowing applies on a per-key basis and is useful for data that is irregularly 
distributed with respect to time. For example, a data stream representing user 
mouse activity may have long periods of idle time interspersed with high 
concentrations of clicks. If data arrives after the minimum specified gap 
duration time, this initiates the start of a new window.</p>
+
+<p><img src="/images/session-windows.png" alt="Diagram of session windows with 
a minimum gap duration" title="Session windows, with a minimum gap duration" 
/></p>
+
+<p><strong>Figure:</strong> Session windows, with a minimum gap duration. Note 
how each data key has different windows, according to its data distribution.</p>
+
+<h4 id="single-global-window">Single global window</h4>
+
+<p>By default, all data in a <code 
class="highlighter-rouge">PCollection</code> is assigned to a single global 
window. If your data set is of a fixed size, you can leave the global window 
default for your <code class="highlighter-rouge">PCollection</code>.</p>
+
+<p>You can use a single global window if you are working with an unbounded 
data set, e.g. from a streaming data source; however, use caution when applying 
aggregating transforms such as <code 
class="highlighter-rouge">GroupByKey</code> and <code 
class="highlighter-rouge">Combine</code>. A single global window with a default 
trigger generally requires the entire data set to be available before 
processing, which is not possible with continuously updating data. To perform 
aggregations on an unbounded <code class="highlighter-rouge">PCollection</code> 
that uses global windowing, you should specify a non-default trigger for that 
<code class="highlighter-rouge">PCollection</code>.</p>
+
+<h3 
id="a-namesetwindowingfunctionasetting-your-pcollections-windowing-function"><a 
name="setwindowingfunction"></a>Setting your PCollection’s windowing 
function</h3>
+
+<p>You can set the windowing function for a <code 
class="highlighter-rouge">PCollection</code> by applying the <code 
class="highlighter-rouge">Window</code> transform. When you apply the <code 
class="highlighter-rouge">Window</code> transform, you must provide a <code 
class="highlighter-rouge">WindowFn</code>. The <code 
class="highlighter-rouge">WindowFn</code> determines the windowing function 
your <code class="highlighter-rouge">PCollection</code> will use for subsequent 
grouping transforms, such as a fixed or sliding time window.</p>
+
+<p>Beam provides pre-defined <code class="highlighter-rouge">WindownFn</code>s 
for the basic windowing functions described here. You can also define your own 
<code class="highlighter-rouge">WindowFn</code> if you have a more complex 
need.</p>
+
+<p>When setting a windowing function, you may also want to set a trigger for 
your <code class="highlighter-rouge">PCollection</code>. The trigger determines 
when each individual window is aggregated and emitted, and helps refine how the 
windowing function performs with respect to late data and computing early 
results. See the <a href="#triggers">triggers</a> section for more 
information.</p>
+
+<h4 id="setting-fixed-time-windows">Setting fixed-time windows</h4>
+
+<p>The following example code shows how to apply <code 
class="highlighter-rouge">Window</code> to divide a <code 
class="highlighter-rouge">PCollection</code> into fixed windows, each one 
minute in length:</p>
+<div class="language-java highlighter-rouge"><pre class="highlight"><code>    
<span class="n">PCollection</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">&gt;</span> <span class="n">items</span> 
<span class="o">=</span> <span class="o">...;</span>
+    <span class="n">PCollection</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">&gt;</span> <span 
class="n">fixed_windowed_items</span> <span class="o">=</span> <span 
class="n">items</span><span class="o">.</span><span 
class="na">apply</span><span class="o">(</span>
+        <span class="n">Window</span><span class="o">.&lt;</span><span 
class="n">String</span><span class="o">&gt;</span><span 
class="n">into</span><span class="o">(</span><span 
class="n">FixedWindows</span><span class="o">.</span><span 
class="na">of</span><span class="o">(</span><span 
class="n">Duration</span><span class="o">.</span><span 
class="na">standardMinutes</span><span class="o">(</span><span 
class="mi">1</span><span class="o">))));</span>
+</code></pre>
+</div>
+<div class="language-py highlighter-rouge"><pre class="highlight"><code><span 
class="kn">from</span> <span class="nn">apache_beam</span> <span 
class="kn">import</span> <span class="n">window</span>
+<span class="n">fixed_windowed_items</span> <span class="o">=</span> <span 
class="p">(</span>
+    <span class="n">items</span> <span class="o">|</span> <span 
class="s">'window'</span> <span class="o">&gt;&gt;</span> <span 
class="n">beam</span><span class="o">.</span><span 
class="n">WindowInto</span><span class="p">(</span><span 
class="n">window</span><span class="o">.</span><span 
class="n">FixedWindows</span><span class="p">(</span><span 
class="mi">60</span><span class="p">)))</span>
+
+</code></pre>
+</div>
+
+<h4 id="setting-sliding-time-windows">Setting sliding time windows</h4>
+
+<p>The following example code shows how to apply <code 
class="highlighter-rouge">Window</code> to divide a <code 
class="highlighter-rouge">PCollection</code> into sliding time windows. Each 
window is 30 minutes in length, and a new window begins every five seconds:</p>
+<div class="language-java highlighter-rouge"><pre class="highlight"><code>    
<span class="n">PCollection</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">&gt;</span> <span class="n">items</span> 
<span class="o">=</span> <span class="o">...;</span>
+    <span class="n">PCollection</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">&gt;</span> <span 
class="n">sliding_windowed_items</span> <span class="o">=</span> <span 
class="n">items</span><span class="o">.</span><span 
class="na">apply</span><span class="o">(</span>
+        <span class="n">Window</span><span class="o">.&lt;</span><span 
class="n">String</span><span class="o">&gt;</span><span 
class="n">into</span><span class="o">(</span><span 
class="n">SlidingWindows</span><span class="o">.</span><span 
class="na">of</span><span class="o">(</span><span 
class="n">Duration</span><span class="o">.</span><span 
class="na">standardMinutes</span><span class="o">(</span><span 
class="mi">30</span><span class="o">)).</span><span 
class="na">every</span><span class="o">(</span><span 
class="n">Duration</span><span class="o">.</span><span 
class="na">standardSeconds</span><span class="o">(</span><span 
class="mi">5</span><span class="o">))));</span>
+</code></pre>
+</div>
+<div class="language-py highlighter-rouge"><pre class="highlight"><code><span 
class="kn">from</span> <span class="nn">apache_beam</span> <span 
class="kn">import</span> <span class="n">window</span>
+<span class="n">sliding_windowed_items</span> <span class="o">=</span> <span 
class="p">(</span>
+    <span class="n">items</span> <span class="o">|</span> <span 
class="s">'window'</span> <span class="o">&gt;&gt;</span> <span 
class="n">beam</span><span class="o">.</span><span 
class="n">WindowInto</span><span class="p">(</span><span 
class="n">window</span><span class="o">.</span><span 
class="n">SlidingWindows</span><span class="p">(</span><span 
class="mi">30</span><span class="p">,</span> <span class="mi">5</span><span 
class="p">)))</span>
+
+</code></pre>
+</div>
+
+<h4 id="setting-session-windows">Setting session windows</h4>
+
+<p>The following example code shows how to apply <code 
class="highlighter-rouge">Window</code> to divide a <code 
class="highlighter-rouge">PCollection</code> into session windows, where each 
session must be separated by a time gap of at least 10 minutes:</p>
+<div class="language-java highlighter-rouge"><pre class="highlight"><code>    
<span class="n">PCollection</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">&gt;</span> <span class="n">items</span> 
<span class="o">=</span> <span class="o">...;</span>
+    <span class="n">PCollection</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">&gt;</span> <span 
class="n">session_windowed_items</span> <span class="o">=</span> <span 
class="n">items</span><span class="o">.</span><span 
class="na">apply</span><span class="o">(</span>
+        <span class="n">Window</span><span class="o">.&lt;</span><span 
class="n">String</span><span class="o">&gt;</span><span 
class="n">into</span><span class="o">(</span><span 
class="n">Sessions</span><span class="o">.</span><span 
class="na">withGapDuration</span><span class="o">(</span><span 
class="n">Duration</span><span class="o">.</span><span 
class="na">standardMinutes</span><span class="o">(</span><span 
class="mi">10</span><span class="o">))));</span>
+</code></pre>
+</div>
+<div class="language-py highlighter-rouge"><pre class="highlight"><code><span 
class="kn">from</span> <span class="nn">apache_beam</span> <span 
class="kn">import</span> <span class="n">window</span>
+<span class="n">session_windowed_items</span> <span class="o">=</span> <span 
class="p">(</span>
+    <span class="n">items</span> <span class="o">|</span> <span 
class="s">'window'</span> <span class="o">&gt;&gt;</span> <span 
class="n">beam</span><span class="o">.</span><span 
class="n">WindowInto</span><span class="p">(</span><span 
class="n">window</span><span class="o">.</span><span 
class="n">Sessions</span><span class="p">(</span><span 
class="mi">10</span><span class="p">)))</span>
+
+</code></pre>
+</div>
+
+<p>Note that the sessions are per-key — each key in the collection will have 
its own session groupings depending on the data distribution.</p>
+
+<h4 id="setting-a-single-global-window">Setting a single global window</h4>
+
+<p>If your <code class="highlighter-rouge">PCollection</code> is bounded (the 
size is fixed), you can assign all the elements to a single global window. The 
following example code shows how to set a single global window for a <code 
class="highlighter-rouge">PCollection</code>:</p>
+
+<div class="language-java highlighter-rouge"><pre class="highlight"><code>    
<span class="n">PCollection</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">&gt;</span> <span class="n">items</span> 
<span class="o">=</span> <span class="o">...;</span>
+    <span class="n">PCollection</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">&gt;</span> <span 
class="n">batch_items</span> <span class="o">=</span> <span 
class="n">items</span><span class="o">.</span><span 
class="na">apply</span><span class="o">(</span>
+        <span class="n">Window</span><span class="o">.&lt;</span><span 
class="n">String</span><span class="o">&gt;</span><span 
class="n">into</span><span class="o">(</span><span class="k">new</span> <span 
class="n">GlobalWindows</span><span class="o">()));</span>
+</code></pre>
+</div>
+<div class="language-py highlighter-rouge"><pre class="highlight"><code><span 
class="kn">from</span> <span class="nn">apache_beam</span> <span 
class="kn">import</span> <span class="n">window</span>
+<span class="n">session_windowed_items</span> <span class="o">=</span> <span 
class="p">(</span>
+    <span class="n">items</span> <span class="o">|</span> <span 
class="s">'window'</span> <span class="o">&gt;&gt;</span> <span 
class="n">beam</span><span class="o">.</span><span 
class="n">WindowInto</span><span class="p">(</span><span 
class="n">window</span><span class="o">.</span><span 
class="n">GlobalWindows</span><span class="p">()))</span>
+
+</code></pre>
+</div>
+
+<h3 id="time-skew-data-lag-and-late-data">Time skew, data lag, and late 
data</h3>
+
+<p>In any data processing system, there is a certain amount of lag between the 
time a data event occurs (the “event time”, determined by the timestamp on 
the data element itself) and the time the actual data element gets processed at 
any stage in your pipeline (the “processing time”, determined by the clock 
on the system processing the element). In addition, there are no guarantees 
that data events will appear in your pipeline in the same order that they were 
generated.</p>
+
+<p>For example, let’s say we have a <code 
class="highlighter-rouge">PCollection</code> that’s using fixed-time 
windowing, with windows that are five minutes long. For each window, Beam must 
collect all the data with an <em>event time</em> timestamp in the given window 
range (between 0:00 and 4:59 in the first window, for instance). Data with 
timestamps outside that range (data from 5:00 or later) belong to a different 
window.</p>
+
+<p>However, data isn’t always guaranteed to arrive in a pipeline in time 
order, or to always arrive at predictable intervals. Beam tracks a 
<em>watermark</em>, which is the system’s notion of when all data in a 
certain window can be expected to have arrived in the pipeline. Data that 
arrives with a timestamp after the watermark is considered <strong>late 
data</strong>.</p>
+
+<p>From our example, suppose we have a simple watermark that assumes 
approximately 30s of lag time between the data timestamps (the event time) and 
the time the data appears in the pipeline (the processing time), then Beam 
would close the first window at 5:30. If a data record arrives at 5:34, but 
with a timestamp that would put it in the 0:00-4:59 window (say, 3:38), then 
that record is late data.</p>
+
+<p>Note: For simplicity, we’ve assumed that we’re using a very 
straightforward watermark that estimates the lag time/time skew. In practice, 
your <code class="highlighter-rouge">PCollection</code>’s data source 
determines the watermark, and watermarks can be more precise or complex.</p>
+
+<h4 id="managing-time-skew-and-late-data">Managing time skew and late data</h4>
+
+<blockquote>
+  <p><strong>Note:</strong> Managing time skew and late data is not supported 
in the Beam SDK for Python.</p>
+</blockquote>
+
+<p>You can allow late data by invoking the <code 
class="highlighter-rouge">.withAllowedLateness</code> operation when you set 
your <code class="highlighter-rouge">PCollection</code>’s windowing strategy. 
The following code example demonstrates a windowing strategy that will allow 
late data up to two days after the end of a window.</p>
+<div class="language-java highlighter-rouge"><pre class="highlight"><code>    
<span class="n">PCollection</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">&gt;</span> <span class="n">items</span> 
<span class="o">=</span> <span class="o">...;</span>
+    <span class="n">PCollection</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">&gt;</span> <span 
class="n">fixed_windowed_items</span> <span class="o">=</span> <span 
class="n">items</span><span class="o">.</span><span 
class="na">apply</span><span class="o">(</span>
+        <span class="n">Window</span><span class="o">.&lt;</span><span 
class="n">String</span><span class="o">&gt;</span><span 
class="n">into</span><span class="o">(</span><span 
class="n">FixedWindows</span><span class="o">.</span><span 
class="na">of</span><span class="o">(</span><span 
class="n">Duration</span><span class="o">.</span><span 
class="na">standardMinutes</span><span class="o">(</span><span 
class="mi">1</span><span class="o">)))</span>
+              <span class="o">.</span><span 
class="na">withAllowedLateness</span><span class="o">(</span><span 
class="n">Duration</span><span class="o">.</span><span 
class="na">standardDays</span><span class="o">(</span><span 
class="mi">2</span><span class="o">)));</span>
+</code></pre>
+</div>
+
+<p>When you set <code class="highlighter-rouge">.withAllowedLateness</code> on 
a <code class="highlighter-rouge">PCollection</code>, that allowed lateness 
propagates forward to any subsequent <code 
class="highlighter-rouge">PCollection</code> derived from the first <code 
class="highlighter-rouge">PCollection</code> you applied allowed lateness to. 
If you want to change the allowed lateness later in your pipeline, you must do 
so explictly by applying <code 
class="highlighter-rouge">Window.withAllowedLateness()</code> again.</p>
+
+<p>You can also use triggers to help you refine the windowing strategy for a 
<code class="highlighter-rouge">PCollection</code>. You can use triggers to 
determine exactly when each individual window aggregates and reports its 
results, including how the window emits late elements.</p>
+
+<h3 id="adding-timestamps-to-a-pcollections-elements">Adding timestamps to a 
PCollection’s elements</h3>
+
+<p>An unbounded source provides a timestamp for each element. Depending on 
your unbounded source, you may need to configure how the timestamp is extracted 
from the raw data stream.</p>
+
+<p>However, bounded sources (such as a file from <code 
class="highlighter-rouge">TextIO</code>) do not provide timestamps. If you need 
timestamps, you must add them to your <code 
class="highlighter-rouge">PCollection</code>’s elements.</p>
+
+<p>You can assign new timestamps to the elements of a <code 
class="highlighter-rouge">PCollection</code> by applying a <a 
href="#transforms-pardo">ParDo</a> transform that outputs new elements with 
timestamps that you set.</p>
+
+<p>An example might be if your pipeline reads log records from an input file, 
and each log record includes a timestamp field; since your pipeline reads the 
records in from a file, the file source doesn’t assign timestamps 
automatically. You can parse the timestamp field from each record and use a 
<code class="highlighter-rouge">ParDo</code> transform with a <code 
class="highlighter-rouge">DoFn</code> to attach the timestamps to each element 
in your <code class="highlighter-rouge">PCollection</code>.</p>
+
+<div class="language-java highlighter-rouge"><pre class="highlight"><code>     
 <span class="n">PCollection</span><span class="o">&lt;</span><span 
class="n">LogEntry</span><span class="o">&gt;</span> <span 
class="n">unstampedLogs</span> <span class="o">=</span> <span 
class="o">...;</span>
+      <span class="n">PCollection</span><span class="o">&lt;</span><span 
class="n">LogEntry</span><span class="o">&gt;</span> <span 
class="n">stampedLogs</span> <span class="o">=</span>
+          <span class="n">unstampedLogs</span><span class="o">.</span><span 
class="na">apply</span><span class="o">(</span><span 
class="n">ParDo</span><span class="o">.</span><span class="na">of</span><span 
class="o">(</span><span class="k">new</span> <span class="n">DoFn</span><span 
class="o">&lt;</span><span class="n">LogEntry</span><span class="o">,</span> 
<span class="n">LogEntry</span><span class="o">&gt;()</span> <span 
class="o">{</span>
+            <span class="kd">public</span> <span class="kt">void</span> <span 
class="nf">processElement</span><span class="o">(</span><span 
class="n">ProcessContext</span> <span class="n">c</span><span 
class="o">)</span> <span class="o">{</span>
+              <span class="c1">// Extract the timestamp from log entry we're 
currently processing.</span>
+              <span class="n">Instant</span> <span 
class="n">logTimeStamp</span> <span class="o">=</span> <span 
class="n">extractTimeStampFromLogEntry</span><span class="o">(</span><span 
class="n">c</span><span class="o">.</span><span class="na">element</span><span 
class="o">());</span>
+              <span class="c1">// Use ProcessContext.outputWithTimestamp 
(rather than</span>
+              <span class="c1">// ProcessContext.output) to emit the entry 
with timestamp attached.</span>
+              <span class="n">c</span><span class="o">.</span><span 
class="na">outputWithTimestamp</span><span class="o">(</span><span 
class="n">c</span><span class="o">.</span><span class="na">element</span><span 
class="o">(),</span> <span class="n">logTimeStamp</span><span 
class="o">);</span>
+            <span class="o">}</span>
+          <span class="o">}));</span>
+</code></pre>
+</div>
+<div class="language-py highlighter-rouge"><pre class="highlight"><code><span 
class="k">class</span> <span class="nc">AddTimestampDoFn</span><span 
class="p">(</span><span class="n">beam</span><span class="o">.</span><span 
class="n">DoFn</span><span class="p">):</span>
+
+  <span class="k">def</span> <span class="nf">process</span><span 
class="p">(</span><span class="bp">self</span><span class="p">,</span> <span 
class="n">element</span><span class="p">):</span>
+    <span class="c"># Extract the numeric Unix seconds-since-epoch timestamp 
to be</span>
+    <span class="c"># associated with the current log entry.</span>
+    <span class="n">unix_timestamp</span> <span class="o">=</span> <span 
class="n">extract_timestamp_from_log_entry</span><span class="p">(</span><span 
class="n">element</span><span class="p">)</span>
+    <span class="c"># Wrap and emit the current entry and new timestamp in 
a</span>
+    <span class="c"># TimestampedValue.</span>
+    <span class="k">yield</span> <span class="n">beam</span><span 
class="o">.</span><span class="n">TimestampedValue</span><span 
class="p">(</span><span class="n">element</span><span class="p">,</span> <span 
class="n">unix_timestamp</span><span class="p">)</span>
+
+<span class="n">timestamped_items</span> <span class="o">=</span> <span 
class="n">items</span> <span class="o">|</span> <span 
class="s">'timestamp'</span> <span class="o">&gt;&gt;</span> <span 
class="n">beam</span><span class="o">.</span><span class="n">ParDo</span><span 
class="p">(</span><span class="n">AddTimestampDoFn</span><span 
class="p">())</span>
+
+</code></pre>
+</div>
+
+<h2 id="a-nametriggersaworking-with-triggers"><a name="triggers"></a>Working 
with triggers</h2>
 
 <blockquote>
   <p><strong>Note:</strong> This guide is still in progress. There is an open 
issue to finish the guide (<a 
href="https://issues.apache.org/jira/browse/BEAM-193";>BEAM-193</a>)</p>

http://git-wip-us.apache.org/repos/asf/beam-site/blob/fa7d6168/content/images/fixed-time-windows.png
----------------------------------------------------------------------
diff --git a/content/images/fixed-time-windows.png 
b/content/images/fixed-time-windows.png
new file mode 100644
index 0000000..832dc64
Binary files /dev/null and b/content/images/fixed-time-windows.png differ

http://git-wip-us.apache.org/repos/asf/beam-site/blob/fa7d6168/content/images/session-windows.png
----------------------------------------------------------------------
diff --git a/content/images/session-windows.png 
b/content/images/session-windows.png
new file mode 100644
index 0000000..3ce844c
Binary files /dev/null and b/content/images/session-windows.png differ

http://git-wip-us.apache.org/repos/asf/beam-site/blob/fa7d6168/content/images/sliding-time-windows.png
----------------------------------------------------------------------
diff --git a/content/images/sliding-time-windows.png 
b/content/images/sliding-time-windows.png
new file mode 100644
index 0000000..056732b
Binary files /dev/null and b/content/images/sliding-time-windows.png differ

http://git-wip-us.apache.org/repos/asf/beam-site/blob/fa7d6168/content/images/unwindowed-pipeline-bounded.png
----------------------------------------------------------------------
diff --git a/content/images/unwindowed-pipeline-bounded.png 
b/content/images/unwindowed-pipeline-bounded.png
new file mode 100644
index 0000000..7725f34
Binary files /dev/null and b/content/images/unwindowed-pipeline-bounded.png 
differ

http://git-wip-us.apache.org/repos/asf/beam-site/blob/fa7d6168/content/images/windowing-pipeline-bounded.png
----------------------------------------------------------------------
diff --git a/content/images/windowing-pipeline-bounded.png 
b/content/images/windowing-pipeline-bounded.png
new file mode 100644
index 0000000..198ed11
Binary files /dev/null and b/content/images/windowing-pipeline-bounded.png 
differ

http://git-wip-us.apache.org/repos/asf/beam-site/blob/fa7d6168/content/images/windowing-pipeline-unbounded.png
----------------------------------------------------------------------
diff --git a/content/images/windowing-pipeline-unbounded.png 
b/content/images/windowing-pipeline-unbounded.png
new file mode 100644
index 0000000..b5c5ee0
Binary files /dev/null and b/content/images/windowing-pipeline-unbounded.png 
differ

Reply via email to