This is an automated email from the ASF dual-hosted git repository.

mergebot-role pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/beam-site.git

commit e96fe9aaaa26bd24fcebd3988b65540395a60a13
Author: Mergebot <[email protected]>
AuthorDate: Mon Sep 18 22:26:57 2017 +0000

    Prepare repository for deployment.
---
 content/documentation/programming-guide/index.html |   2 +-
 .../get-started/mobile-gaming-example/index.html   | 445 ++++++++++++++-------
 2 files changed, 308 insertions(+), 139 deletions(-)

diff --git a/content/documentation/programming-guide/index.html 
b/content/documentation/programming-guide/index.html
index 0234c78..8a9ab93 100644
--- a/content/documentation/programming-guide/index.html
+++ b/content/documentation/programming-guide/index.html
@@ -2016,7 +2016,7 @@ Subsequent transforms, however, are applied to the result 
of the <code class="hi
 
 <p>This code sample sets a time-based trigger for a <code 
class="highlighter-rouge">PCollection</code>, which emits results one minute 
after the first element in that window has been processed. The last line in the 
code sample, <code class="highlighter-rouge">.discardingFiredPanes()</code>, is 
the window’s <strong>accumulation mode</strong>.</p>
 
-<h4 id="window-accumulation-modes">Window Accumulation Modes</h4>
+<h4 id="a-namewindow-accumulation-modesawindow-accumulation-modes"><a 
name="window-accumulation-modes"></a>Window Accumulation Modes</h4>
 
 <p>When you specify a trigger, you must also set the the window’s 
<strong>accumulation mode</strong>. When a trigger fires, it emits the current 
contents of the window as a pane. Since a trigger can fire multiple times, the 
accumulation mode determines whether the system <em>accumulates</em> the window 
panes as the trigger fires, or <em>discards</em> them.</p>
 
diff --git a/content/get-started/mobile-gaming-example/index.html 
b/content/get-started/mobile-gaming-example/index.html
index 1826ada..dbea94c 100644
--- a/content/get-started/mobile-gaming-example/index.html
+++ b/content/get-started/mobile-gaming-example/index.html
@@ -148,7 +148,6 @@
 <ul id="markdown-toc">
   <li><a href="#userscore-basic-score-processing-in-batch" 
id="markdown-toc-userscore-basic-score-processing-in-batch">UserScore: Basic 
Score Processing in Batch</a>    <ul>
       <li><a href="#what-does-userscore-do" 
id="markdown-toc-what-does-userscore-do">What Does UserScore Do?</a></li>
-      <li><a href="#working-with-the-results" 
id="markdown-toc-working-with-the-results">Working with the Results</a></li>
       <li><a href="#limitations" 
id="markdown-toc-limitations">Limitations</a></li>
     </ul>
   </li>
@@ -181,7 +180,7 @@
 </ul>
 
 <nav class="language-switcher">
-  <strong>Adapt for:</strong> 
+  <strong>Adapt for:</strong>
   <ul>
     <li data-type="language-java">Java SDK</li>
     <li data-type="language-py">Python SDK</li>
@@ -207,14 +206,14 @@
   <li>A timestamp that records when the particular instance of play 
happened–this is the event time for each game data event.</li>
 </ul>
 
-<p>When the user completes an instance of the game, their phone sends the data 
event to a game server, where the data is logged and stored in a file. 
Generally the data is sent to the game server immediately upon completion. 
However, sometimes delays happen in the network or users play the game 
“offline”, when their phones are out of contact with the server (such as on an 
airplane, or outside network coverage area). When the user’s phone comes back 
into contact with the game server, the  [...]
+<p>When the user completes an instance of the game, their phone sends the data 
event to a game server, where the data is logged and stored in a file. 
Generally the data is sent to the game server immediately upon completion. 
However, sometimes delays can happen in the network at various points. Another 
possible scenario involves users who play the game “offline”, when their phones 
are out of contact with the server (such as on an airplane, or outside network 
coverage area). When the user [...]
 
-<p>The following diagram shows the ideal situation vs reality. The X-axis 
represents event time: the actual time a game event occurred. The Y-axis 
represents processing time: the time at which a game event was processed. 
Ideally, events should be processed as they occur, depicted by the dotted line 
in the diagram. However, in reality that is not the case and reality looks more 
like what is depicted by the red squiggly line.</p>
+<p>The following diagram shows the ideal situation (events are processed as 
they occur) vs. reality (there is often a time delay before processing).</p>
 
 <figure id="fig1">
     <img src="/images/gaming-example-basic.png" width="264" height="260" 
alt="Score data for three users." />
 </figure>
-<p>Figure 1: Ideally, events are processed when they occur, with no delays.</p>
+<p><strong>Figure 1:</strong> The X-axis represents event time: the actual 
time a game event occurred. The Y-axis represents processing time: the time at 
which a game event was processed. Ideally, events should be processed as they 
occur, depicted by the dotted line in the diagram. However, in reality that is 
not the case and it looks more like what is depicted by the red squiggly 
line.</p>
 
 <p>The data events might be received by the game server significantly later 
than users generate them. This time difference (called <strong>skew</strong>) 
can have processing implications for pipelines that make calculations that 
consider when each score was generated. Such pipelines might track scores 
generated during each hour of a day, for example, or they calculate the length 
of time that users are continuously playing the game—both of which depend on 
each data record’s event time.</p>
 
@@ -222,7 +221,7 @@
 
 <p>For pipelines that read unbounded game data from an unbounded source, the 
data source sets the intrinsic <a 
href="/documentation/programming-guide/#pctimestamps">timestamp</a> for each 
PCollection element to the appropriate event time.</p>
 
-<p>The Mobile Game example pipelines vary in complexity, from simple batch 
analysis to more complex pipelines that can perform real-time analysis and 
abuse detection. This section walks you through each example and demonstrates 
how to use Beam features like windowing and triggers to expand your pipeline’s 
capabilites.</p>
+<p>The Mobile Gaming example pipelines vary in complexity, from simple batch 
analysis to more complex pipelines that can perform real-time analysis and 
abuse detection. This section walks you through each example and demonstrates 
how to use Beam features like windowing and triggers to expand your pipeline’s 
capabilites.</p>
 
 <h2 id="userscore-basic-score-processing-in-batch">UserScore: Basic Score 
Processing in Batch</h2>
 
@@ -251,9 +250,9 @@
 <p><code class="highlighter-rouge">UserScore</code>’s basic pipeline flow does 
the following:</p>
 
 <ol>
-  <li>Read the day’s score data from a file stored in a text file.</li>
+  <li>Read the day’s score data from a text file.</li>
   <li>Sum the score values for each unique user by grouping each game event by 
user ID and combining the score values to get the total score for that 
particular user.</li>
-  <li>Write the result data to a <a 
href="https://cloud.google.com/bigquery/";>Google Cloud BigQuery</a> table.</li>
+  <li>Write the result data to a text file.</li>
 </ol>
 
 <p>The following diagram shows score data for several users over the pipeline 
analysis period. In the diagram, each data point is an event that results in 
one user/score pair:</p>
@@ -261,7 +260,7 @@
 <figure id="fig2">
     <img src="/images/gaming-example.gif" width="900" height="263" alt="Score 
data for three users." />
 </figure>
-<p>Figure 2: Score data for three users.</p>
+<p><strong>Figure 2:</strong> Score data for three users.</p>
 
 <p>This example uses batch processing, and the diagram’s Y axis represents 
processing time: the pipeline processes events lower on the Y-axis first, and 
events higher up the axis later. The diagram’s X axis represents the event time 
for each game event, as denoted by that event’s timestamp. Note that the 
individual events in the diagram are not processed by the pipeline in the same 
order as they occurred (according to their timestamps).</p>
 
@@ -282,15 +281,13 @@
 
     <span class="k">return</span> <span class="n">gameInfo</span>
       <span class="o">.</span><span class="na">apply</span><span 
class="o">(</span><span class="n">MapElements</span>
-          <span class="o">.</span><span class="na">into</span><span 
class="o">(</span>
-              <span class="n">TypeDescriptors</span><span 
class="o">.</span><span class="na">kvs</span><span class="o">(</span><span 
class="n">TypeDescriptors</span><span class="o">.</span><span 
class="na">strings</span><span class="o">(),</span> <span 
class="n">TypeDescriptors</span><span class="o">.</span><span 
class="na">integers</span><span class="o">()))</span>
+          <span class="o">.</span><span class="na">into</span><span 
class="o">(</span><span class="n">TypeDescriptors</span><span 
class="o">.</span><span class="na">kvs</span><span class="o">(</span><span 
class="n">TypeDescriptors</span><span class="o">.</span><span 
class="na">strings</span><span class="o">(),</span> <span 
class="n">TypeDescriptors</span><span class="o">.</span><span 
class="na">integers</span><span class="o">()))</span>
           <span class="o">.</span><span class="na">via</span><span 
class="o">((</span><span class="n">GameActionInfo</span> <span 
class="n">gInfo</span><span class="o">)</span> <span class="o">-&gt;</span> 
<span class="n">KV</span><span class="o">.</span><span 
class="na">of</span><span class="o">(</span><span class="n">gInfo</span><span 
class="o">.</span><span class="na">getKey</span><span class="o">(</span><span 
class="n">field</span><span class="o">),</span> <span class="n">gInfo</span 
[...]
       <span class="o">.</span><span class="na">apply</span><span 
class="o">(</span><span class="n">Sum</span><span class="o">.&lt;</span><span 
class="n">String</span><span class="o">&gt;</span><span 
class="n">integersPerKey</span><span class="o">());</span>
   <span class="o">}</span>
 <span class="o">}</span>
 </code></pre>
 </div>
-
 <div class="language-py highlighter-rouge"><pre class="highlight"><code><span 
class="k">class</span> <span class="nc">ExtractAndSumScore</span><span 
class="p">(</span><span class="n">beam</span><span class="o">.</span><span 
class="n">PTransform</span><span class="p">):</span>
   <span class="s">"""A transform to extract key/score information and sum the 
scores.
   The constructor argument `field` determines whether 'team' or 'user' info is
@@ -302,14 +299,8 @@
 
   <span class="k">def</span> <span class="nf">expand</span><span 
class="p">(</span><span class="bp">self</span><span class="p">,</span> <span 
class="n">pcoll</span><span class="p">):</span>
     <span class="k">return</span> <span class="p">(</span><span 
class="n">pcoll</span>
-            <span class="o">|</span> <span class="n">beam</span><span 
class="o">.</span><span class="n">Map</span><span class="p">(</span><span 
class="k">lambda</span> <span class="n">info</span><span class="p">:</span> 
<span class="p">(</span><span class="n">info</span><span 
class="p">[</span><span class="bp">self</span><span class="o">.</span><span 
class="n">field</span><span class="p">],</span> <span 
class="n">info</span><span class="p">[</span><span 
class="s">'score'</span><span clas [...]
-            <span class="o">|</span> <span class="n">beam</span><span 
class="o">.</span><span class="n">CombinePerKey</span><span 
class="p">(</span><span class="n">sum_ints</span><span class="p">))</span>
-
-<span class="k">def</span> <span 
class="nf">configure_bigquery_write</span><span class="p">():</span>
-  <span class="k">return</span> <span class="p">[</span>
-      <span class="p">(</span><span class="s">'user'</span><span 
class="p">,</span> <span class="s">'STRING'</span><span class="p">,</span> 
<span class="k">lambda</span> <span class="n">e</span><span class="p">:</span> 
<span class="n">e</span><span class="p">[</span><span class="mi">0</span><span 
class="p">]),</span>
-      <span class="p">(</span><span class="s">'total_score'</span><span 
class="p">,</span> <span class="s">'INTEGER'</span><span class="p">,</span> 
<span class="k">lambda</span> <span class="n">e</span><span class="p">:</span> 
<span class="n">e</span><span class="p">[</span><span class="mi">1</span><span 
class="p">]),</span>
-  <span class="p">]</span>
+            <span class="o">|</span> <span class="n">beam</span><span 
class="o">.</span><span class="n">Map</span><span class="p">(</span><span 
class="k">lambda</span> <span class="n">elem</span><span class="p">:</span> 
<span class="p">(</span><span class="n">elem</span><span 
class="p">[</span><span class="bp">self</span><span class="o">.</span><span 
class="n">field</span><span class="p">],</span> <span 
class="n">elem</span><span class="p">[</span><span 
class="s">'score'</span><span clas [...]
+            <span class="o">|</span> <span class="n">beam</span><span 
class="o">.</span><span class="n">CombinePerKey</span><span 
class="p">(</span><span class="nb">sum</span><span class="p">))</span>
 </code></pre>
 </div>
 
@@ -323,46 +314,47 @@
   <span class="n">Pipeline</span> <span class="n">pipeline</span> <span 
class="o">=</span> <span class="n">Pipeline</span><span class="o">.</span><span 
class="na">create</span><span class="o">(</span><span 
class="n">options</span><span class="o">);</span>
 
   <span class="c1">// Read events from a text file and parse them.</span>
-  <span class="n">pipeline</span><span class="o">.</span><span 
class="na">apply</span><span class="o">(</span><span 
class="n">TextIO</span><span class="o">.</span><span 
class="na">read</span><span class="o">().</span><span 
class="na">from</span><span class="o">(</span><span 
class="n">options</span><span class="o">.</span><span 
class="na">getInput</span><span class="o">()))</span>
-    <span class="o">.</span><span class="na">apply</span><span 
class="o">(</span><span class="s">"ParseGameEvent"</span><span 
class="o">,</span> <span class="n">ParDo</span><span class="o">.</span><span 
class="na">of</span><span class="o">(</span><span class="k">new</span> <span 
class="n">ParseEventFn</span><span class="o">()))</span>
-    <span class="c1">// Extract and sum username/score pairs from the event 
data.</span>
-    <span class="o">.</span><span class="na">apply</span><span 
class="o">(</span><span class="s">"ExtractUserScore"</span><span 
class="o">,</span> <span class="k">new</span> <span 
class="n">ExtractAndSumScore</span><span class="o">(</span><span 
class="s">"user"</span><span class="o">))</span>
-    <span class="o">.</span><span class="na">apply</span><span 
class="o">(</span><span class="s">"WriteUserScoreSums"</span><span 
class="o">,</span>
-        <span class="k">new</span> <span class="n">WriteToBigQuery</span><span 
class="o">&lt;</span><span class="n">KV</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">Integer</span><span class="o">&gt;&gt;(</span><span 
class="n">options</span><span class="o">.</span><span 
class="na">getTableName</span><span class="o">(),</span>
-                                                 <span 
class="n">configureBigQueryWrite</span><span class="o">()));</span>
+  <span class="n">pipeline</span>
+      <span class="o">.</span><span class="na">apply</span><span 
class="o">(</span><span class="n">TextIO</span><span class="o">.</span><span 
class="na">read</span><span class="o">().</span><span 
class="na">from</span><span class="o">(</span><span 
class="n">options</span><span class="o">.</span><span 
class="na">getInput</span><span class="o">()))</span>
+      <span class="o">.</span><span class="na">apply</span><span 
class="o">(</span><span class="s">"ParseGameEvent"</span><span 
class="o">,</span> <span class="n">ParDo</span><span class="o">.</span><span 
class="na">of</span><span class="o">(</span><span class="k">new</span> <span 
class="n">ParseEventFn</span><span class="o">()))</span>
+      <span class="c1">// Extract and sum username/score pairs from the event 
data.</span>
+      <span class="o">.</span><span class="na">apply</span><span 
class="o">(</span><span class="s">"ExtractUserScore"</span><span 
class="o">,</span> <span class="k">new</span> <span 
class="n">ExtractAndSumScore</span><span class="o">(</span><span 
class="s">"user"</span><span class="o">))</span>
+      <span class="o">.</span><span class="na">apply</span><span 
class="o">(</span>
+          <span class="s">"WriteUserScoreSums"</span><span class="o">,</span>
+          <span class="k">new</span> <span class="n">WriteToText</span><span 
class="o">&lt;</span><span class="n">KV</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">Integer</span><span class="o">&gt;&gt;(</span>
+              <span class="n">options</span><span class="o">.</span><span 
class="na">getOutput</span><span class="o">(),</span>
+              <span class="n">configureOutput</span><span class="o">(),</span>
+              <span class="kc">false</span><span class="o">));</span>
 
   <span class="c1">// Run the batch pipeline.</span>
   <span class="n">pipeline</span><span class="o">.</span><span 
class="na">run</span><span class="o">().</span><span 
class="na">waitUntilFinish</span><span class="o">();</span>
 <span class="o">}</span>
 </code></pre>
 </div>
-
 <div class="language-py highlighter-rouge"><pre class="highlight"><code><span 
class="k">def</span> <span class="nf">run</span><span class="p">(</span><span 
class="n">argv</span><span class="o">=</span><span class="bp">None</span><span 
class="p">):</span>
   <span class="s">"""Main entry point; defines and runs the user_score 
pipeline."""</span>
-  
-  <span class="o">...</span>
-
-  <span class="n">pipeline_options</span> <span class="o">=</span> <span 
class="n">PipelineOptions</span><span class="p">(</span><span 
class="n">pipeline_args</span><span class="p">)</span>
-  <span class="n">p</span> <span class="o">=</span> <span 
class="n">beam</span><span class="o">.</span><span 
class="n">Pipeline</span><span class="p">(</span><span 
class="n">options</span><span class="o">=</span><span 
class="n">pipeline_options</span><span class="p">)</span>
-
-  <span class="p">(</span><span class="n">p</span>  <span class="c"># pylint: 
disable=expression-not-assigned</span>
-   <span class="o">|</span> <span class="n">ReadFromText</span><span 
class="p">(</span><span class="n">known_args</span><span 
class="o">.</span><span class="nb">input</span><span class="p">)</span> <span 
class="c"># Read events from a file and parse them.</span>
-   <span class="o">|</span> <span class="n">UserScore</span><span 
class="p">()</span>
-   <span class="o">|</span> <span class="n">WriteToBigQuery</span><span 
class="p">(</span>
-       <span class="n">known_args</span><span class="o">.</span><span 
class="n">table_name</span><span class="p">,</span> <span 
class="n">known_args</span><span class="o">.</span><span 
class="n">dataset</span><span class="p">,</span> <span 
class="n">configure_bigquery_write</span><span class="p">()))</span>
-
-  <span class="n">result</span> <span class="o">=</span> <span 
class="n">p</span><span class="o">.</span><span class="n">run</span><span 
class="p">()</span>
-  <span class="n">result</span><span class="o">.</span><span 
class="n">wait_until_finish</span><span class="p">()</span>
-</code></pre>
-</div>
-
-<h3 id="working-with-the-results">Working with the Results</h3>
-
-<p><code class="highlighter-rouge">UserScore</code> writes the data to a 
BigQuery table (called <code class="highlighter-rouge">user_score</code> by 
default). With the data in the BigQuery table, we might perform a further 
interactive analysis, such as querying for a list of the N top-scoring users 
for a given day.</p>
-
-<p>Let’s suppose we want to interactively determine the top 10 highest-scoring 
users for a given day. In the BigQuery user interface, we can run the following 
query:</p>
-
-<div class="highlighter-rouge"><pre class="highlight"><code>SELECT * FROM 
[MyGameProject:MyGameDataset.user_score] ORDER BY total_score DESC LIMIT 10
+  <span class="n">parser</span> <span class="o">=</span> <span 
class="n">argparse</span><span class="o">.</span><span 
class="n">ArgumentParser</span><span class="p">()</span>
+
+  <span class="c"># The default maps to two large Google Cloud Storage files 
(each ~12GB)</span>
+  <span class="c"># holding two subsequent day's worth (roughly) of 
data.</span>
+  <span class="n">parser</span><span class="o">.</span><span 
class="n">add_argument</span><span class="p">(</span><span 
class="s">'--input'</span><span class="p">,</span>
+                      <span class="nb">type</span><span 
class="o">=</span><span class="nb">str</span><span class="p">,</span>
+                      <span class="n">default</span><span 
class="o">=</span><span 
class="s">'gs://apache-beam-samples/game/gaming_data*.csv'</span><span 
class="p">,</span>
+                      <span class="n">help</span><span class="o">=</span><span 
class="s">'Path to the data file(s) containing game data.'</span><span 
class="p">)</span>
+  <span class="n">parser</span><span class="o">.</span><span 
class="n">add_argument</span><span class="p">(</span><span 
class="s">'--output'</span><span class="p">,</span>
+                      <span class="nb">type</span><span 
class="o">=</span><span class="nb">str</span><span class="p">,</span>
+                      <span class="n">required</span><span 
class="o">=</span><span class="bp">True</span><span class="p">,</span>
+                      <span class="n">help</span><span class="o">=</span><span 
class="s">'Path to the output file(s).'</span><span class="p">)</span>
+
+  <span class="n">args</span><span class="p">,</span> <span 
class="n">pipeline_args</span> <span class="o">=</span> <span 
class="n">parser</span><span class="o">.</span><span 
class="n">parse_known_args</span><span class="p">(</span><span 
class="n">argv</span><span class="p">)</span>
+
+  <span class="k">with</span> <span class="n">beam</span><span 
class="o">.</span><span class="n">Pipeline</span><span class="p">(</span><span 
class="n">argv</span><span class="o">=</span><span 
class="n">pipeline_args</span><span class="p">)</span> <span 
class="k">as</span> <span class="n">p</span><span class="p">:</span>
+    <span class="p">(</span><span class="n">p</span>  <span class="c"># 
pylint: disable=expression-not-assigned</span>
+     <span class="o">|</span> <span class="s">'ReadInputText'</span> <span 
class="o">&gt;&gt;</span> <span class="n">beam</span><span 
class="o">.</span><span class="n">io</span><span class="o">.</span><span 
class="n">ReadFromText</span><span class="p">(</span><span 
class="n">args</span><span class="o">.</span><span class="nb">input</span><span 
class="p">)</span>
+     <span class="o">|</span> <span class="s">'UserScore'</span> <span 
class="o">&gt;&gt;</span> <span class="n">UserScore</span><span 
class="p">()</span>
+     <span class="o">|</span> <span class="s">'FormatUserScoreSums'</span> 
<span class="o">&gt;&gt;</span> <span class="n">beam</span><span 
class="o">.</span><span class="n">Map</span><span class="p">(</span>
+         <span class="k">lambda</span> <span class="p">(</span><span 
class="n">user</span><span class="p">,</span> <span class="n">score</span><span 
class="p">):</span> <span class="s">'user: </span><span 
class="si">%</span><span class="s">s, total_score: </span><span 
class="si">%</span><span class="s">s'</span> <span class="o">%</span> <span 
class="p">(</span><span class="n">user</span><span class="p">,</span> <span 
class="n">score</span><span class="p">))</span>
+     <span class="o">|</span> <span class="s">'WriteUserScoreSums'</span> 
<span class="o">&gt;&gt;</span> <span class="n">beam</span><span 
class="o">.</span><span class="n">io</span><span class="o">.</span><span 
class="n">WriteToText</span><span class="p">(</span><span 
class="n">args</span><span class="o">.</span><span class="n">output</span><span 
class="p">))</span>
 </code></pre>
 </div>
 
@@ -391,7 +383,7 @@
 
 <p>The <code class="highlighter-rouge">HourlyTeamScore</code> pipeline expands 
on the basic batch analysis principles used in the <code 
class="highlighter-rouge">UserScore</code> pipeline and improves upon some of 
its limitations. <code class="highlighter-rouge">HourlyTeamScore</code> 
performs finer-grained analysis, both by using additional features in the Beam 
SDKs, and taking into account more aspects of the game data. For example, <code 
class="highlighter-rouge">HourlyTeamScore</code [...]
 
-<p>Like <code class="highlighter-rouge">UserScore</code>, <code 
class="highlighter-rouge">HourlyTeamScore</code> is best thought of as a job to 
be run periodically after all the relevant data has been gathered (such as once 
per day). The pipeline reads a fixed data set from a file, and writes the 
results to a Google Cloud BigQuery table, just like <code 
class="highlighter-rouge">UserScore</code>.</p>
+<p>Like <code class="highlighter-rouge">UserScore</code>, <code 
class="highlighter-rouge">HourlyTeamScore</code> is best thought of as a job to 
be run periodically after all the relevant data has been gathered (such as once 
per day). The pipeline reads a fixed data set from a file, and writes the 
results to a Google Cloud BigQuery table.</p>
 
 <blockquote class="language-java">
   <p><strong>Note:</strong> See <a 
href="https://github.com/apache/beam/blob/master/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java";>HourlyTeamScore
 on GitHub</a> for the complete example pipeline program.</p>
@@ -425,7 +417,7 @@
 <figure id="fig3">
     <img src="/images/gaming-example-team-scores-narrow.gif" width="900" 
height="390" alt="Score data for two teams." />
 </figure>
-<p>Figure 3: Score data for two teams. Each team’s scores are divided into 
logical windows based on when those scores occurred in event time.</p>
+<p><strong>Figure 3:</strong> Score data for two teams. Each team’s scores are 
divided into logical windows based on when those scores occurred in event 
time.</p>
 
 <p>Notice that as processing time advances, the sums are now <em>per 
window</em>; each window represents an hour of <em>event time</em> during the 
day in which the scores occurred.</p>
 
@@ -442,21 +434,18 @@
 <p>The following code shows this:</p>
 
 <div class="language-java highlighter-rouge"><pre 
class="highlight"><code><span class="c1">// Add an element timestamp based on 
the event log, and apply fixed windowing.</span>
-    <span class="o">.</span><span class="na">apply</span><span 
class="o">(</span><span class="s">"AddEventTimestamps"</span><span 
class="o">,</span>
-           <span class="n">WithTimestamps</span><span class="o">.</span><span 
class="na">of</span><span class="o">((</span><span 
class="n">GameActionInfo</span> <span class="n">i</span><span 
class="o">)</span> <span class="o">-&gt;</span> <span class="k">new</span> 
<span class="n">Instant</span><span class="o">(</span><span 
class="n">i</span><span class="o">.</span><span 
class="na">getTimestamp</span><span class="o">())))</span>
-    <span class="o">.</span><span class="na">apply</span><span 
class="o">(</span><span class="s">"FixedWindowsTeam"</span><span 
class="o">,</span> <span class="n">Window</span><span 
class="o">.&lt;</span><span class="n">GameActionInfo</span><span 
class="o">&gt;</span><span class="n">into</span><span class="o">(</span>
-        <span class="n">FixedWindows</span><span class="o">.</span><span 
class="na">of</span><span class="o">(</span><span 
class="n">Duration</span><span class="o">.</span><span 
class="na">standardMinutes</span><span class="o">(</span><span 
class="n">options</span><span class="o">.</span><span 
class="na">getWindowDuration</span><span class="o">()))))</span>
+<span class="o">.</span><span class="na">apply</span><span 
class="o">(</span><span class="s">"AddEventTimestamps"</span><span 
class="o">,</span>
+       <span class="n">WithTimestamps</span><span class="o">.</span><span 
class="na">of</span><span class="o">((</span><span 
class="n">GameActionInfo</span> <span class="n">i</span><span 
class="o">)</span> <span class="o">-&gt;</span> <span class="k">new</span> 
<span class="n">Instant</span><span class="o">(</span><span 
class="n">i</span><span class="o">.</span><span 
class="na">getTimestamp</span><span class="o">())))</span>
+<span class="o">.</span><span class="na">apply</span><span 
class="o">(</span><span class="s">"FixedWindowsTeam"</span><span 
class="o">,</span> <span class="n">Window</span><span 
class="o">.&lt;</span><span class="n">GameActionInfo</span><span 
class="o">&gt;</span><span class="n">into</span><span class="o">(</span>
+    <span class="n">FixedWindows</span><span class="o">.</span><span 
class="na">of</span><span class="o">(</span><span 
class="n">Duration</span><span class="o">.</span><span 
class="na">standardMinutes</span><span class="o">(</span><span 
class="n">options</span><span class="o">.</span><span 
class="na">getWindowDuration</span><span class="o">()))))</span>
 </code></pre>
 </div>
-
-<div class="language-py highlighter-rouge"><pre class="highlight"><code><span 
class="c"># Add an element timestamp based on the event log, and apply fixed 
windowing.</span>
-<span class="c"># Convert element['timestamp'] into seconds as expected by 
TimestampedValue.</span>
+<div class="language-py highlighter-rouge"><pre class="highlight"><code><span 
class="c"># Add an element timestamp based on the event log, and apply 
fixed</span>
+<span class="c"># windowing.</span>
 <span class="o">|</span> <span class="s">'AddEventTimestamps'</span> <span 
class="o">&gt;&gt;</span> <span class="n">beam</span><span 
class="o">.</span><span class="n">Map</span><span class="p">(</span>
-    <span class="k">lambda</span> <span class="n">element</span><span 
class="p">:</span> <span class="n">TimestampedValue</span><span 
class="p">(</span>
-        <span class="n">element</span><span class="p">,</span> <span 
class="n">element</span><span class="p">[</span><span 
class="s">'timestamp'</span><span class="p">]</span> <span class="o">/</span> 
<span class="mf">1000.0</span><span class="p">))</span>
-<span class="c"># Convert window_duration into seconds as expected by 
FixedWindows.</span>
-<span class="o">|</span> <span class="s">'FixedWindowsTeam'</span> <span 
class="o">&gt;&gt;</span> <span class="n">beam</span><span 
class="o">.</span><span class="n">WindowInto</span><span 
class="p">(</span><span class="n">FixedWindows</span><span class="p">(</span>
-    <span class="n">size</span><span class="o">=</span><span 
class="bp">self</span><span class="o">.</span><span 
class="n">window_duration</span> <span class="o">*</span> <span 
class="mi">60</span><span class="p">))</span>
+    <span class="k">lambda</span> <span class="n">elem</span><span 
class="p">:</span> <span class="n">beam</span><span class="o">.</span><span 
class="n">window</span><span class="o">.</span><span 
class="n">TimestampedValue</span><span class="p">(</span><span 
class="n">elem</span><span class="p">,</span> <span class="n">elem</span><span 
class="p">[</span><span class="s">'timestamp'</span><span class="p">]))</span>
+<span class="o">|</span> <span class="s">'FixedWindowsTeam'</span> <span 
class="o">&gt;&gt;</span> <span class="n">beam</span><span 
class="o">.</span><span class="n">WindowInto</span><span class="p">(</span>
+    <span class="n">beam</span><span class="o">.</span><span 
class="n">window</span><span class="o">.</span><span 
class="n">FixedWindows</span><span class="p">(</span><span 
class="bp">self</span><span class="o">.</span><span 
class="n">window_duration_in_seconds</span><span class="p">))</span>
 </code></pre>
 </div>
 
@@ -480,11 +469,10 @@
         <span class="o">-&gt;</span> <span class="n">gInfo</span><span 
class="o">.</span><span class="na">getTimestamp</span><span class="o">()</span> 
<span class="o">&lt;</span> <span class="n">stopMinTimestamp</span><span 
class="o">.</span><span class="na">getMillis</span><span class="o">()))</span>
 </code></pre>
 </div>
-
 <div class="language-py highlighter-rouge"><pre class="highlight"><code><span 
class="o">|</span> <span class="s">'FilterStartTime'</span> <span 
class="o">&gt;&gt;</span> <span class="n">beam</span><span 
class="o">.</span><span class="n">Filter</span><span class="p">(</span>
-    <span class="k">lambda</span> <span class="n">element</span><span 
class="p">:</span> <span class="n">element</span><span class="p">[</span><span 
class="s">'timestamp'</span><span class="p">]</span> <span 
class="o">&gt;</span> <span class="n">start_min_filter</span><span 
class="p">)</span>
+    <span class="k">lambda</span> <span class="n">elem</span><span 
class="p">:</span> <span class="n">elem</span><span class="p">[</span><span 
class="s">'timestamp'</span><span class="p">]</span> <span 
class="o">&gt;</span> <span class="bp">self</span><span class="o">.</span><span 
class="n">start_timestamp</span><span class="p">)</span>
 <span class="o">|</span> <span class="s">'FilterEndTime'</span> <span 
class="o">&gt;&gt;</span> <span class="n">beam</span><span 
class="o">.</span><span class="n">Filter</span><span class="p">(</span>
-    <span class="k">lambda</span> <span class="n">element</span><span 
class="p">:</span> <span class="n">element</span><span class="p">[</span><span 
class="s">'timestamp'</span><span class="p">]</span> <span 
class="o">&lt;</span> <span class="n">end_min_filter</span><span 
class="p">)</span>
+    <span class="k">lambda</span> <span class="n">elem</span><span 
class="p">:</span> <span class="n">elem</span><span class="p">[</span><span 
class="s">'timestamp'</span><span class="p">]</span> <span 
class="o">&lt;</span> <span class="bp">self</span><span class="o">.</span><span 
class="n">stop_timestamp</span><span class="p">)</span>
 </code></pre>
 </div>
 
@@ -531,28 +519,27 @@
     <span class="c1">// Extract and sum teamname/score pairs from the event 
data.</span>
     <span class="o">.</span><span class="na">apply</span><span 
class="o">(</span><span class="s">"ExtractTeamScore"</span><span 
class="o">,</span> <span class="k">new</span> <span 
class="n">ExtractAndSumScore</span><span class="o">(</span><span 
class="s">"team"</span><span class="o">))</span>
     <span class="o">.</span><span class="na">apply</span><span 
class="o">(</span><span class="s">"WriteTeamScoreSums"</span><span 
class="o">,</span>
-      <span class="k">new</span> <span 
class="n">WriteWindowedToBigQuery</span><span class="o">&lt;</span><span 
class="n">KV</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">Integer</span><span class="o">&gt;&gt;(</span><span 
class="n">options</span><span class="o">.</span><span 
class="na">getTableName</span><span class="o">(),</span>
-          <span class="n">configureWindowedTableWrite</span><span 
class="o">()));</span>
+        <span class="k">new</span> <span class="n">WriteToText</span><span 
class="o">&lt;</span><span class="n">KV</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">Integer</span><span class="o">&gt;&gt;(</span>
+            <span class="n">options</span><span class="o">.</span><span 
class="na">getOutput</span><span class="o">(),</span>
+            <span class="n">configureOutput</span><span class="o">(),</span>
+            <span class="kc">true</span><span class="o">));</span>
 
   <span class="n">pipeline</span><span class="o">.</span><span 
class="na">run</span><span class="o">().</span><span 
class="na">waitUntilFinish</span><span class="o">();</span>
 <span class="o">}</span>
 </code></pre>
 </div>
-
 <div class="language-py highlighter-rouge"><pre class="highlight"><code><span 
class="k">class</span> <span class="nc">HourlyTeamScore</span><span 
class="p">(</span><span class="n">beam</span><span class="o">.</span><span 
class="n">PTransform</span><span class="p">):</span>
   <span class="k">def</span> <span class="nf">__init__</span><span 
class="p">(</span><span class="bp">self</span><span class="p">,</span> <span 
class="n">start_min</span><span class="p">,</span> <span 
class="n">stop_min</span><span class="p">,</span> <span 
class="n">window_duration</span><span class="p">):</span>
     <span class="nb">super</span><span class="p">(</span><span 
class="n">HourlyTeamScore</span><span class="p">,</span> <span 
class="bp">self</span><span class="p">)</span><span class="o">.</span><span 
class="n">__init__</span><span class="p">()</span>
-    <span class="bp">self</span><span class="o">.</span><span 
class="n">start_min</span> <span class="o">=</span> <span 
class="n">start_min</span>
-    <span class="bp">self</span><span class="o">.</span><span 
class="n">stop_min</span> <span class="o">=</span> <span 
class="n">stop_min</span>
-    <span class="bp">self</span><span class="o">.</span><span 
class="n">window_duration</span> <span class="o">=</span> <span 
class="n">window_duration</span>
+    <span class="bp">self</span><span class="o">.</span><span 
class="n">start_timestamp</span> <span class="o">=</span> <span 
class="n">str2timestamp</span><span class="p">(</span><span 
class="n">start_min</span><span class="p">)</span>
+    <span class="bp">self</span><span class="o">.</span><span 
class="n">stop_timestamp</span> <span class="o">=</span> <span 
class="n">str2timestamp</span><span class="p">(</span><span 
class="n">stop_min</span><span class="p">)</span>
+    <span class="bp">self</span><span class="o">.</span><span 
class="n">window_duration_in_seconds</span> <span class="o">=</span> <span 
class="n">window_duration</span> <span class="o">*</span> <span 
class="mi">60</span>
 
   <span class="k">def</span> <span class="nf">expand</span><span 
class="p">(</span><span class="bp">self</span><span class="p">,</span> <span 
class="n">pcoll</span><span class="p">):</span>
-    <span class="n">start_min_filter</span> <span class="o">=</span> <span 
class="n">string_to_timestamp</span><span class="p">(</span><span 
class="bp">self</span><span class="o">.</span><span 
class="n">start_min</span><span class="p">)</span>
-    <span class="n">end_min_filter</span> <span class="o">=</span> <span 
class="n">string_to_timestamp</span><span class="p">(</span><span 
class="bp">self</span><span class="o">.</span><span 
class="n">stop_min</span><span class="p">)</span>
-
     <span class="k">return</span> <span class="p">(</span>
         <span class="n">pcoll</span>
-        <span class="o">|</span> <span class="s">'ParseGameEvent'</span> <span 
class="o">&gt;&gt;</span> <span class="n">beam</span><span 
class="o">.</span><span class="n">ParDo</span><span class="p">(</span><span 
class="n">ParseEventFn</span><span class="p">())</span>
+        <span class="o">|</span> <span class="s">'ParseGameEventFn'</span> 
<span class="o">&gt;&gt;</span> <span class="n">beam</span><span 
class="o">.</span><span class="n">ParDo</span><span class="p">(</span><span 
class="n">ParseGameEventFn</span><span class="p">())</span>
+
         <span class="c"># Filter out data before and after the given times so 
that it is not</span>
         <span class="c"># included in the calculations. As we collect data in 
batches (say, by</span>
         <span class="c"># day), the batch for the day that we want to analyze 
could potentially</span>
@@ -561,43 +548,91 @@
         <span class="c"># (to scoop up late-arriving events from the day we're 
analyzing), we</span>
         <span class="c"># need to weed out events that fall after the time 
period we want to</span>
         <span class="c"># analyze.</span>
+        <span class="c"># [START filter_by_time_range]</span>
         <span class="o">|</span> <span class="s">'FilterStartTime'</span> 
<span class="o">&gt;&gt;</span> <span class="n">beam</span><span 
class="o">.</span><span class="n">Filter</span><span class="p">(</span>
-            <span class="k">lambda</span> <span class="n">element</span><span 
class="p">:</span> <span class="n">element</span><span class="p">[</span><span 
class="s">'timestamp'</span><span class="p">]</span> <span 
class="o">&gt;</span> <span class="n">start_min_filter</span><span 
class="p">)</span>
+            <span class="k">lambda</span> <span class="n">elem</span><span 
class="p">:</span> <span class="n">elem</span><span class="p">[</span><span 
class="s">'timestamp'</span><span class="p">]</span> <span 
class="o">&gt;</span> <span class="bp">self</span><span class="o">.</span><span 
class="n">start_timestamp</span><span class="p">)</span>
         <span class="o">|</span> <span class="s">'FilterEndTime'</span> <span 
class="o">&gt;&gt;</span> <span class="n">beam</span><span 
class="o">.</span><span class="n">Filter</span><span class="p">(</span>
-            <span class="k">lambda</span> <span class="n">element</span><span 
class="p">:</span> <span class="n">element</span><span class="p">[</span><span 
class="s">'timestamp'</span><span class="p">]</span> <span 
class="o">&lt;</span> <span class="n">end_min_filter</span><span 
class="p">)</span>
+            <span class="k">lambda</span> <span class="n">elem</span><span 
class="p">:</span> <span class="n">elem</span><span class="p">[</span><span 
class="s">'timestamp'</span><span class="p">]</span> <span 
class="o">&lt;</span> <span class="bp">self</span><span class="o">.</span><span 
class="n">stop_timestamp</span><span class="p">)</span>
+        <span class="c"># [END filter_by_time_range]</span>
+
+        <span class="c"># [START add_timestamp_and_window]</span>
         <span class="c"># Add an element timestamp based on the event log, and 
apply fixed</span>
         <span class="c"># windowing.</span>
-        <span class="c"># Convert element['timestamp'] into seconds as 
expected by</span>
-        <span class="c"># TimestampedValue.</span>
         <span class="o">|</span> <span class="s">'AddEventTimestamps'</span> 
<span class="o">&gt;&gt;</span> <span class="n">beam</span><span 
class="o">.</span><span class="n">Map</span><span class="p">(</span>
-            <span class="k">lambda</span> <span class="n">element</span><span 
class="p">:</span> <span class="n">TimestampedValue</span><span 
class="p">(</span>
-                <span class="n">element</span><span class="p">,</span> <span 
class="n">element</span><span class="p">[</span><span 
class="s">'timestamp'</span><span class="p">]</span> <span class="o">/</span> 
<span class="mf">1000.0</span><span class="p">))</span>
-        <span class="c"># Convert window_duration into seconds as expected by 
FixedWindows.</span>
-        <span class="o">|</span> <span class="s">'FixedWindowsTeam'</span> 
<span class="o">&gt;&gt;</span> <span class="n">beam</span><span 
class="o">.</span><span class="n">WindowInto</span><span 
class="p">(</span><span class="n">FixedWindows</span><span class="p">(</span>
-            <span class="n">size</span><span class="o">=</span><span 
class="bp">self</span><span class="o">.</span><span 
class="n">window_duration</span> <span class="o">*</span> <span 
class="mi">60</span><span class="p">))</span>
+            <span class="k">lambda</span> <span class="n">elem</span><span 
class="p">:</span> <span class="n">beam</span><span class="o">.</span><span 
class="n">window</span><span class="o">.</span><span 
class="n">TimestampedValue</span><span class="p">(</span><span 
class="n">elem</span><span class="p">,</span> <span class="n">elem</span><span 
class="p">[</span><span class="s">'timestamp'</span><span class="p">]))</span>
+        <span class="o">|</span> <span class="s">'FixedWindowsTeam'</span> 
<span class="o">&gt;&gt;</span> <span class="n">beam</span><span 
class="o">.</span><span class="n">WindowInto</span><span class="p">(</span>
+            <span class="n">beam</span><span class="o">.</span><span 
class="n">window</span><span class="o">.</span><span 
class="n">FixedWindows</span><span class="p">(</span><span 
class="bp">self</span><span class="o">.</span><span 
class="n">window_duration_in_seconds</span><span class="p">))</span>
+        <span class="c"># [END add_timestamp_and_window]</span>
+
         <span class="c"># Extract and sum teamname/score pairs from the event 
data.</span>
-        <span class="o">|</span> <span class="s">'ExtractTeamScore'</span> 
<span class="o">&gt;&gt;</span> <span class="n">ExtractAndSumScore</span><span 
class="p">(</span><span class="s">'team'</span><span class="p">))</span>
+        <span class="o">|</span> <span class="s">'ExtractAndSumScore'</span> 
<span class="o">&gt;&gt;</span> <span class="n">ExtractAndSumScore</span><span 
class="p">(</span><span class="s">'team'</span><span class="p">))</span>
 
 
 <span class="k">def</span> <span class="nf">run</span><span 
class="p">(</span><span class="n">argv</span><span class="o">=</span><span 
class="bp">None</span><span class="p">):</span>
   <span class="s">"""Main entry point; defines and runs the hourly_team_score 
pipeline."""</span>
-  <span class="o">...</span>
-
-  <span class="n">known_args</span><span class="p">,</span> <span 
class="n">pipeline_args</span> <span class="o">=</span> <span 
class="n">parser</span><span class="o">.</span><span 
class="n">parse_known_args</span><span class="p">(</span><span 
class="n">argv</span><span class="p">)</span>
-
-  <span class="n">pipeline_options</span> <span class="o">=</span> <span 
class="n">PipelineOptions</span><span class="p">(</span><span 
class="n">pipeline_args</span><span class="p">)</span>
-  <span class="n">p</span> <span class="o">=</span> <span 
class="n">beam</span><span class="o">.</span><span 
class="n">Pipeline</span><span class="p">(</span><span 
class="n">options</span><span class="o">=</span><span 
class="n">pipeline_options</span><span class="p">)</span>
-  <span class="n">pipeline_options</span><span class="o">.</span><span 
class="n">view_as</span><span class="p">(</span><span 
class="n">SetupOptions</span><span class="p">)</span><span 
class="o">.</span><span class="n">save_main_session</span> <span 
class="o">=</span> <span class="bp">True</span>
-
-  <span class="p">(</span><span class="n">p</span>  <span class="c"># pylint: 
disable=expression-not-assigned</span>
-   <span class="o">|</span> <span class="n">ReadFromText</span><span 
class="p">(</span><span class="n">known_args</span><span 
class="o">.</span><span class="nb">input</span><span class="p">)</span>
-   <span class="o">|</span> <span class="n">HourlyTeamScore</span><span 
class="p">(</span>
-       <span class="n">known_args</span><span class="o">.</span><span 
class="n">start_min</span><span class="p">,</span> <span 
class="n">known_args</span><span class="o">.</span><span 
class="n">stop_min</span><span class="p">,</span> <span 
class="n">known_args</span><span class="o">.</span><span 
class="n">window_duration</span><span class="p">)</span>
-   <span class="o">|</span> <span 
class="n">WriteWindowedToBigQuery</span><span class="p">(</span>
-       <span class="n">known_args</span><span class="o">.</span><span 
class="n">table_name</span><span class="p">,</span> <span 
class="n">known_args</span><span class="o">.</span><span 
class="n">dataset</span><span class="p">,</span> <span 
class="n">configure_bigquery_write</span><span class="p">()))</span>
-
-  <span class="n">result</span> <span class="o">=</span> <span 
class="n">p</span><span class="o">.</span><span class="n">run</span><span 
class="p">()</span>
-  <span class="n">result</span><span class="o">.</span><span 
class="n">wait_until_finish</span><span class="p">()</span>
+  <span class="n">parser</span> <span class="o">=</span> <span 
class="n">argparse</span><span class="o">.</span><span 
class="n">ArgumentParser</span><span class="p">()</span>
+
+  <span class="c"># The default maps to two large Google Cloud Storage files 
(each ~12GB)</span>
+  <span class="c"># holding two subsequent day's worth (roughly) of 
data.</span>
+  <span class="n">parser</span><span class="o">.</span><span 
class="n">add_argument</span><span class="p">(</span><span 
class="s">'--input'</span><span class="p">,</span>
+                      <span class="nb">type</span><span 
class="o">=</span><span class="nb">str</span><span class="p">,</span>
+                      <span class="n">default</span><span 
class="o">=</span><span 
class="s">'gs://apache-beam-samples/game/gaming_data*.csv'</span><span 
class="p">,</span>
+                      <span class="n">help</span><span class="o">=</span><span 
class="s">'Path to the data file(s) containing game data.'</span><span 
class="p">)</span>
+  <span class="n">parser</span><span class="o">.</span><span 
class="n">add_argument</span><span class="p">(</span><span 
class="s">'--dataset'</span><span class="p">,</span>
+                      <span class="nb">type</span><span 
class="o">=</span><span class="nb">str</span><span class="p">,</span>
+                      <span class="n">required</span><span 
class="o">=</span><span class="bp">True</span><span class="p">,</span>
+                      <span class="n">help</span><span class="o">=</span><span 
class="s">'BigQuery Dataset to write tables to. '</span>
+                      <span class="s">'Must already exist.'</span><span 
class="p">)</span>
+  <span class="n">parser</span><span class="o">.</span><span 
class="n">add_argument</span><span class="p">(</span><span 
class="s">'--table_name'</span><span class="p">,</span>
+                      <span class="n">default</span><span 
class="o">=</span><span class="s">'leader_board'</span><span class="p">,</span>
+                      <span class="n">help</span><span class="o">=</span><span 
class="s">'The BigQuery table name. Should not already exist.'</span><span 
class="p">)</span>
+  <span class="n">parser</span><span class="o">.</span><span 
class="n">add_argument</span><span class="p">(</span><span 
class="s">'--window_duration'</span><span class="p">,</span>
+                      <span class="nb">type</span><span 
class="o">=</span><span class="nb">int</span><span class="p">,</span>
+                      <span class="n">default</span><span 
class="o">=</span><span class="mi">60</span><span class="p">,</span>
+                      <span class="n">help</span><span class="o">=</span><span 
class="s">'Numeric value of fixed window duration, in minutes'</span><span 
class="p">)</span>
+  <span class="n">parser</span><span class="o">.</span><span 
class="n">add_argument</span><span class="p">(</span><span 
class="s">'--start_min'</span><span class="p">,</span>
+                      <span class="nb">type</span><span 
class="o">=</span><span class="nb">str</span><span class="p">,</span>
+                      <span class="n">default</span><span 
class="o">=</span><span class="s">'1970-01-01-00-00'</span><span 
class="p">,</span>
+                      <span class="n">help</span><span class="o">=</span><span 
class="s">'String representation of the first minute after '</span>
+                           <span class="s">'which to generate results in the 
format: '</span>
+                           <span class="s">'yyyy-MM-dd-HH-mm. Any input data 
timestamped '</span>
+                           <span class="s">'prior to that minute 
won</span><span class="se">\'</span><span class="s">t be included in the 
'</span>
+                           <span class="s">'sums.'</span><span 
class="p">)</span>
+  <span class="n">parser</span><span class="o">.</span><span 
class="n">add_argument</span><span class="p">(</span><span 
class="s">'--stop_min'</span><span class="p">,</span>
+                      <span class="nb">type</span><span 
class="o">=</span><span class="nb">str</span><span class="p">,</span>
+                      <span class="n">default</span><span 
class="o">=</span><span class="s">'2100-01-01-00-00'</span><span 
class="p">,</span>
+                      <span class="n">help</span><span class="o">=</span><span 
class="s">'String representation of the first minute for '</span>
+                           <span class="s">'which to generate results in the 
format: '</span>
+                           <span class="s">'yyyy-MM-dd-HH-mm. Any input data 
timestamped '</span>
+                           <span class="s">'after to that minute 
won</span><span class="se">\'</span><span class="s">t be included in the 
'</span>
+                           <span class="s">'sums.'</span><span 
class="p">)</span>
+
+  <span class="n">args</span><span class="p">,</span> <span 
class="n">pipeline_args</span> <span class="o">=</span> <span 
class="n">parser</span><span class="o">.</span><span 
class="n">parse_known_args</span><span class="p">(</span><span 
class="n">argv</span><span class="p">)</span>
+
+  <span class="n">options</span> <span class="o">=</span> <span 
class="n">PipelineOptions</span><span class="p">(</span><span 
class="n">pipeline_args</span><span class="p">)</span>
+
+  <span class="c"># We also require the --project option to access 
--dataset</span>
+  <span class="k">if</span> <span class="n">options</span><span 
class="o">.</span><span class="n">view_as</span><span class="p">(</span><span 
class="n">GoogleCloudOptions</span><span class="p">)</span><span 
class="o">.</span><span class="n">project</span> <span class="ow">is</span> 
<span class="bp">None</span><span class="p">:</span>
+    <span class="n">parser</span><span class="o">.</span><span 
class="n">print_usage</span><span class="p">()</span>
+    <span class="k">print</span><span class="p">(</span><span 
class="n">sys</span><span class="o">.</span><span class="n">argv</span><span 
class="p">[</span><span class="mi">0</span><span class="p">]</span> <span 
class="o">+</span> <span class="s">': error: argument --project is 
required'</span><span class="p">)</span>
+    <span class="n">sys</span><span class="o">.</span><span 
class="nb">exit</span><span class="p">(</span><span class="mi">1</span><span 
class="p">)</span>
+
+  <span class="c"># We use the save_main_session option because one or more 
DoFn's in this</span>
+  <span class="c"># workflow rely on global context (e.g., a module imported 
at module level).</span>
+  <span class="n">options</span><span class="o">.</span><span 
class="n">view_as</span><span class="p">(</span><span 
class="n">SetupOptions</span><span class="p">)</span><span 
class="o">.</span><span class="n">save_main_session</span> <span 
class="o">=</span> <span class="bp">True</span>
+
+  <span class="k">with</span> <span class="n">beam</span><span 
class="o">.</span><span class="n">Pipeline</span><span class="p">(</span><span 
class="n">options</span><span class="o">=</span><span 
class="n">options</span><span class="p">)</span> <span class="k">as</span> 
<span class="n">p</span><span class="p">:</span>
+    <span class="p">(</span><span class="n">p</span>  <span class="c"># 
pylint: disable=expression-not-assigned</span>
+     <span class="o">|</span> <span class="s">'ReadInputText'</span> <span 
class="o">&gt;&gt;</span> <span class="n">beam</span><span 
class="o">.</span><span class="n">io</span><span class="o">.</span><span 
class="n">ReadFromText</span><span class="p">(</span><span 
class="n">args</span><span class="o">.</span><span class="nb">input</span><span 
class="p">)</span>
+     <span class="o">|</span> <span class="s">'HourlyTeamScore'</span> <span 
class="o">&gt;&gt;</span> <span class="n">HourlyTeamScore</span><span 
class="p">(</span>
+         <span class="n">args</span><span class="o">.</span><span 
class="n">start_min</span><span class="p">,</span> <span 
class="n">args</span><span class="o">.</span><span 
class="n">stop_min</span><span class="p">,</span> <span 
class="n">args</span><span class="o">.</span><span 
class="n">window_duration</span><span class="p">)</span>
+     <span class="o">|</span> <span class="s">'TeamScoresDict'</span> <span 
class="o">&gt;&gt;</span> <span class="n">beam</span><span 
class="o">.</span><span class="n">ParDo</span><span class="p">(</span><span 
class="n">TeamScoresDict</span><span class="p">())</span>
+     <span class="o">|</span> <span class="s">'WriteTeamScoreSums'</span> 
<span class="o">&gt;&gt;</span> <span class="n">WriteToBigQuery</span><span 
class="p">(</span>
+         <span class="n">args</span><span class="o">.</span><span 
class="n">table_name</span><span class="p">,</span> <span 
class="n">args</span><span class="o">.</span><span 
class="n">dataset</span><span class="p">,</span> <span class="p">{</span>
+             <span class="s">'team'</span><span class="p">:</span> <span 
class="s">'STRING'</span><span class="p">,</span>
+             <span class="s">'total_score'</span><span class="p">:</span> 
<span class="s">'INTEGER'</span><span class="p">,</span>
+             <span class="s">'window_start'</span><span class="p">:</span> 
<span class="s">'STRING'</span><span class="p">,</span>
+         <span class="p">}))</span>
 </code></pre>
 </div>
 
@@ -611,27 +646,27 @@
 
 <h2 
id="leaderboard-streaming-processing-with-real-time-game-data">LeaderBoard: 
Streaming Processing with Real-Time Game Data</h2>
 
-<blockquote>
-  <p><strong>Note:</strong> This example currently exists in Java only.</p>
-</blockquote>
-
 <p>One way we can help address the latency issue present in the <code 
class="highlighter-rouge">UserScore</code> and <code 
class="highlighter-rouge">HourlyTeamScore</code> pipelines is by reading the 
score data from an unbounded source. The <code 
class="highlighter-rouge">LeaderBoard</code> pipeline introduces streaming 
processing by reading the game score data from an unbounded source that 
produces an infinite amount of data, rather than from a file on the game 
server.</p>
 
 <p>The <code class="highlighter-rouge">LeaderBoard</code> pipeline also 
demonstrates how to process game score data with respect to both <em>processing 
time</em> and <em>event time</em>. <code 
class="highlighter-rouge">LeaderBoard</code> outputs data about both individual 
user scores and about team scores, each with respect to a different time 
frame.</p>
 
-<p>Because the <code class="highlighter-rouge">LeaderBoard</code> pipeline 
reads the game data from an unbounded source as that data is generated, you can 
think of the pipeline as an ongoing job running concurrently with the game 
process. <code class="highlighter-rouge">LeaderBoard</code> can thus provide 
low-latency insights into how users are playing the game at any given 
moment—useful if, for example, we want to provide a live web-based scoreboard 
so that users can track their progres [...]
+<p>Because the <code class="highlighter-rouge">LeaderBoard</code> pipeline 
reads the game data from an unbounded source as that data is generated, you can 
think of the pipeline as an ongoing job running concurrently with the game 
process. <code class="highlighter-rouge">LeaderBoard</code> can thus provide 
low-latency insights into how users are playing the game at any given moment — 
useful if, for example, we want to provide a live web-based scoreboard so that 
users can track their progr [...]
 
-<blockquote>
+<blockquote class="language-java">
   <p><strong>Note:</strong> See <a 
href="https://github.com/apache/beam/blob/master/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java";>LeaderBoard
 on GitHub</a> for the complete example pipeline program.</p>
 </blockquote>
 
+<blockquote class="language-py">
+  <p><strong>Note:</strong> See <a 
href="https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/complete/game/leader_board.py";>LeaderBoard
 on GitHub</a> for the complete example pipeline program.</p>
+</blockquote>
+
 <h3 id="what-does-leaderboard-do">What Does LeaderBoard Do?</h3>
 
 <p>The <code class="highlighter-rouge">LeaderBoard</code> pipeline reads game 
data published to an unbounded source that produces an infinite amount of data 
in near real-time, and uses that data to perform two separate processing 
tasks:</p>
 
 <ul>
   <li>
-    <p><code class="highlighter-rouge">LeaderBoard</code> calculates the total 
score for every unique user and publishes speculative results for every ten 
minutes of <em>processing time</em>. That is, every ten minutes, the pipeline 
outputs the total score per user that the pipeline has processed to date. This 
calculation provides a running “leader board” in close to real time, regardless 
of when the actual game events were generated.</p>
+    <p><code class="highlighter-rouge">LeaderBoard</code> calculates the total 
score for every unique user and publishes speculative results for every ten 
minutes of <em>processing time</em>. That is, ten minutes after data is 
received, the pipeline outputs the total score per user that the pipeline has 
processed to date. This calculation provides a running “leader board” in close 
to real time, regardless of when the actual game events were generated.</p>
   </li>
   <li>
     <p><code class="highlighter-rouge">LeaderBoard</code> calculates the team 
scores for each hour that the pipeline runs. This is useful if we want to, for 
example, reward the top-scoring team for each hour of play. The team score 
calculation uses fixed-time windowing to divide the input data into hour-long 
finite windows based on the <em>event time</em> (indicated by the timestamp) as 
data arrives in the pipeline.</p>
@@ -644,16 +679,16 @@
 
 <h4 id="calculating-user-score-based-on-processing-time">Calculating User 
Score based on Processing Time</h4>
 
-<p>We want our pipeline to output a running total score for each user for 
every ten minutes that the pipeline runs. This calculation doesn’t consider 
<em>when</em> the actual score was generated by the user’s play instance; it 
simply outputs the sum of all the scores for that user that have arrived in the 
pipeline to date. Late data gets included in the calculation whenever it 
happens to arrive in the pipeline as it’s running.</p>
+<p>We want our pipeline to output a running total score for each user for 
every ten minutes of processing time. This calculation doesn’t consider 
<em>when</em> the actual score was generated by the user’s play instance; it 
simply outputs the sum of all the scores for that user that have arrived in the 
pipeline to date. Late data gets included in the calculation whenever it 
happens to arrive in the pipeline as it’s running.</p>
 
 <p>Because we want all the data that has arrived in the pipeline every time we 
update our calculation, we have the pipeline consider all of the user score 
data in a <strong>single global window</strong>. The single global window is 
unbounded, but we can specify a kind of temporary cut-off point for each 
ten-minute calculation by using a processing time <a 
href="/documentation/programming-guide/#triggers">trigger</a>.</p>
 
-<p>When we specify a ten-minute processing time trigger for the single global 
window, the pipeline effectively takes a “snapshot” of the contents of the 
window every time the trigger fires. This snapshot happens at ten-minute 
intervals as long as data has arrived. If no data has arrived, the pipeline 
will take its next “snapshot” 10 minutes past an element arriving. Since we’re 
using a single global window, each snapshot contains all the data collected 
<em>to that point in time</em>. The [...]
+<p>When we specify a ten-minute processing time trigger for the single global 
window, the pipeline effectively takes a “snapshot” of the contents of the 
window every time the trigger fires. This snapshot happens after ten minutes 
have passed since data was received. If no data has arrived, the pipeline takes 
its next “snapshot” 10 minutes after an element arrives. Since we’re using a 
single global window, each snapshot contains all the data collected <em>to that 
point in time</em>. The f [...]
 
 <figure id="fig4">
-    <img src="/images/gaming-example-proc-time-narrow.gif" width="900" 
height="263" alt="Score data for for three users." />
+    <img src="/images/gaming-example-proc-time-narrow.gif" width="900" 
height="263" alt="Score data for three users." />
 </figure>
-<p>Figure 4: Score data for for three users. Each user’s scores are grouped 
together in a single global window, with a trigger that generates a snapshot 
for output every ten minutes.</p>
+<p><strong>Figure 4:</strong> Score data for three users. Each user’s scores 
are grouped together in a single global window, with a trigger that generates a 
snapshot for output ten minutes after data is received.</p>
 
 <p>As processing time advances and more scores are processed, the trigger 
outputs the updated sum for each user.</p>
 
@@ -687,12 +722,35 @@
 <span class="o">}</span>
 </code></pre>
 </div>
+<div class="language-py highlighter-rouge"><pre class="highlight"><code><span 
class="k">class</span> <span class="nc">CalculateUserScores</span><span 
class="p">(</span><span class="n">beam</span><span class="o">.</span><span 
class="n">PTransform</span><span class="p">):</span>
+  <span class="s">"""Extract user/score pairs from the event stream using 
processing time, via
+  global windowing. Get periodic updates on all users' running scores.
+  """</span>
+  <span class="k">def</span> <span class="nf">__init__</span><span 
class="p">(</span><span class="bp">self</span><span class="p">,</span> <span 
class="n">allowed_lateness</span><span class="p">):</span>
+    <span class="nb">super</span><span class="p">(</span><span 
class="n">CalculateUserScores</span><span class="p">,</span> <span 
class="bp">self</span><span class="p">)</span><span class="o">.</span><span 
class="n">__init__</span><span class="p">()</span>
+    <span class="bp">self</span><span class="o">.</span><span 
class="n">allowed_lateness_seconds</span> <span class="o">=</span> <span 
class="n">allowed_lateness</span> <span class="o">*</span> <span 
class="mi">60</span>
+
+  <span class="k">def</span> <span class="nf">expand</span><span 
class="p">(</span><span class="bp">self</span><span class="p">,</span> <span 
class="n">pcoll</span><span class="p">):</span>
+    <span class="c"># NOTE: the behavior does not exactly match the Java 
example</span>
+    <span class="c"># TODO: allowed_lateness not implemented yet in 
FixedWindows</span>
+    <span class="c"># TODO: AfterProcessingTime not implemented yet, replace 
AfterCount</span>
+    <span class="k">return</span> <span class="p">(</span>
+        <span class="n">pcoll</span>
+        <span class="c"># Get periodic results every ten events.</span>
+        <span class="o">|</span> <span 
class="s">'LeaderboardUserGlobalWindows'</span> <span class="o">&gt;&gt;</span> 
<span class="n">beam</span><span class="o">.</span><span 
class="n">WindowInto</span><span class="p">(</span>
+            <span class="n">beam</span><span class="o">.</span><span 
class="n">window</span><span class="o">.</span><span 
class="n">GlobalWindows</span><span class="p">(),</span>
+            <span class="n">trigger</span><span class="o">=</span><span 
class="n">trigger</span><span class="o">.</span><span 
class="n">Repeatedly</span><span class="p">(</span><span 
class="n">trigger</span><span class="o">.</span><span 
class="n">AfterCount</span><span class="p">(</span><span 
class="mi">10</span><span class="p">)),</span>
+            <span class="n">accumulation_mode</span><span 
class="o">=</span><span class="n">trigger</span><span class="o">.</span><span 
class="n">AccumulationMode</span><span class="o">.</span><span 
class="n">ACCUMULATING</span><span class="p">)</span>
+        <span class="c"># Extract and sum username/score pairs from the event 
data.</span>
+        <span class="o">|</span> <span class="s">'ExtractAndSumScore'</span> 
<span class="o">&gt;&gt;</span> <span class="n">ExtractAndSumScore</span><span 
class="p">(</span><span class="s">'user'</span><span class="p">))</span>
+</code></pre>
+</div>
 
-<p>Note that <code class="highlighter-rouge">LeaderBoard</code> uses an 
accumulating trigger for the user score calculation (by invoking <code 
class="highlighter-rouge">.accumulatingFiredPanes</code> when setting the 
trigger). Using an accumulating trigger causes the pipeline to accumulate the 
previously emitted data together with any new data that’s arrived since the 
last trigger fire. This ensures that <code 
class="highlighter-rouge">LeaderBoard</code> a running sum for the user scores 
[...]
+<p><code class="highlighter-rouge">LeaderBoard</code> sets the <a 
href="/documentation/programming-guide/#window-accumulation-modes">window 
accumulation mode</a> to accumulate window panes as the trigger fires. This 
accumulation mode is set by <span class="language-java">invoking <code 
class="highlighter-rouge">.accumulatingFiredPanes</code></span> <span 
class="language-py">using <code 
class="highlighter-rouge">accumulation_mode=trigger.AccumulationMode.ACCUMULATING</code></span>
 when se [...]
 
 <h4 id="calculating-team-score-based-on-event-time">Calculating Team Score 
based on Event Time</h4>
 
-<p>We want our pipeline to also output the total score for each team during 
each hour of play. Unlike the user score calculation, for team scores, we care 
about when in <em>event</em> time each score actually occurred, because we want 
to consider each hour of play individually. We also want to provide speculative 
updates as each individual hour progresses, and to allow any instances of late 
data—data that arrives after a given hour’s data is considered complete—to be 
included in our calc [...]
+<p>We want our pipeline to also output the total score for each team during 
each hour of play. Unlike the user score calculation, for team scores, we care 
about when in <em>event</em> time each score actually occurred, because we want 
to consider each hour of play individually. We also want to provide speculative 
updates as each individual hour progresses, and to allow any instances of late 
data — data that arrives after a given hour’s data is considered complete — to 
be included in our  [...]
 
 <p>Because we consider each hour individually, we can apply fixed-time 
windowing to our input data, just like in <code 
class="highlighter-rouge">HourlyTeamScore</code>. To provide the speculative 
updates and updates on late data, we’ll specify additional trigger parameters. 
The trigger will cause each window to calculate and emit results at an interval 
we specify (in this case, every five minutes), and also to keep triggering 
after the window is considered “complete” to account for late  [...]
 
@@ -705,15 +763,16 @@
 <figure id="fig5">
     <img src="/images/gaming-example-event-time-narrow.gif" width="900" 
height="390" alt="Score data by team, windowed by event time." />
 </figure>
-<p>Figure 5: Score data by team, windowed by event time. A trigger based on 
processing time causes the window to emit speculative early results and include 
late results.</p>
+<p><strong>Figure 5:</strong> Score data by team, windowed by event time. A 
trigger based on processing time causes the window to emit speculative early 
results and include late results.</p>
 
 <p>The dotted line in the diagram is the “ideal” <strong>watermark</strong>: 
Beam’s notion of when all data in a given window can reasonably be considered 
to have arrived. The irregular solid line represents the actual watermark, as 
determined by the data source.</p>
 
-<p>Data arriving above the solid watermark line is <em>late data</em>—this is 
a score event that was delayed (perhaps generated offline) and arrived after 
the window to which it belongs had closed. Our pipeline’s late-firing trigger 
ensures that this late data is still included in the sum.</p>
+<p>Data arriving above the solid watermark line is <em>late data</em> — this 
is a score event that was delayed (perhaps generated offline) and arrived after 
the window to which it belongs had closed. Our pipeline’s late-firing trigger 
ensures that this late data is still included in the sum.</p>
 
 <p>The following code example shows how <code 
class="highlighter-rouge">LeaderBoard</code> applies fixed-time windowing with 
the appropriate triggers to have our pipeline perform the calculations we 
want:</p>
 
 <div class="language-java highlighter-rouge"><pre 
class="highlight"><code><span class="c1">// Extract team/score pairs from the 
event stream, using hour-long windows by default.</span>
+<span class="nd">@VisibleForTesting</span>
 <span class="kd">static</span> <span class="kd">class</span> <span 
class="nc">CalculateTeamScores</span>
     <span class="kd">extends</span> <span class="n">PTransform</span><span 
class="o">&lt;</span><span class="n">PCollection</span><span 
class="o">&lt;</span><span class="n">GameActionInfo</span><span 
class="o">&gt;,</span> <span class="n">PCollection</span><span 
class="o">&lt;</span><span class="n">KV</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">Integer</span><span class="o">&gt;&gt;&gt;</span> <span 
class="o">{</span>
   <span class="kd">private</span> <span class="kd">final</span> <span 
class="n">Duration</span> <span class="n">teamWindowDuration</span><span 
class="o">;</span>
@@ -743,23 +802,51 @@
 <span class="o">}</span>
 </code></pre>
 </div>
+<div class="language-py highlighter-rouge"><pre class="highlight"><code><span 
class="k">class</span> <span class="nc">CalculateTeamScores</span><span 
class="p">(</span><span class="n">beam</span><span class="o">.</span><span 
class="n">PTransform</span><span class="p">):</span>
+  <span class="s">"""Calculates scores for each team within the configured 
window duration.
+
+  Extract team/score pairs from the event stream, using hour-long windows by
+  default.
+  """</span>
+  <span class="k">def</span> <span class="nf">__init__</span><span 
class="p">(</span><span class="bp">self</span><span class="p">,</span> <span 
class="n">team_window_duration</span><span class="p">,</span> <span 
class="n">allowed_lateness</span><span class="p">):</span>
+    <span class="nb">super</span><span class="p">(</span><span 
class="n">CalculateTeamScores</span><span class="p">,</span> <span 
class="bp">self</span><span class="p">)</span><span class="o">.</span><span 
class="n">__init__</span><span class="p">()</span>
+    <span class="bp">self</span><span class="o">.</span><span 
class="n">team_window_duration</span> <span class="o">=</span> <span 
class="n">team_window_duration</span> <span class="o">*</span> <span 
class="mi">60</span>
+    <span class="bp">self</span><span class="o">.</span><span 
class="n">allowed_lateness_seconds</span> <span class="o">=</span> <span 
class="n">allowed_lateness</span> <span class="o">*</span> <span 
class="mi">60</span>
+
+  <span class="k">def</span> <span class="nf">expand</span><span 
class="p">(</span><span class="bp">self</span><span class="p">,</span> <span 
class="n">pcoll</span><span class="p">):</span>
+    <span class="c"># NOTE: the behavior does not exactly match the Java 
example</span>
+    <span class="c"># TODO: allowed_lateness not implemented yet in 
FixedWindows</span>
+    <span class="c"># TODO: AfterProcessingTime not implemented yet, replace 
AfterCount</span>
+    <span class="k">return</span> <span class="p">(</span>
+        <span class="n">pcoll</span>
+        <span class="c"># We will get early (speculative) results as well as 
cumulative</span>
+        <span class="c"># processing of late data.</span>
+        <span class="o">|</span> <span 
class="s">'LeaderboardTeamFixedWindows'</span> <span class="o">&gt;&gt;</span> 
<span class="n">beam</span><span class="o">.</span><span 
class="n">WindowInto</span><span class="p">(</span>
+            <span class="n">beam</span><span class="o">.</span><span 
class="n">window</span><span class="o">.</span><span 
class="n">FixedWindows</span><span class="p">(</span><span 
class="bp">self</span><span class="o">.</span><span 
class="n">team_window_duration</span><span class="p">),</span>
+            <span class="n">trigger</span><span class="o">=</span><span 
class="n">trigger</span><span class="o">.</span><span 
class="n">AfterWatermark</span><span class="p">(</span><span 
class="n">trigger</span><span class="o">.</span><span 
class="n">AfterCount</span><span class="p">(</span><span 
class="mi">10</span><span class="p">),</span>
+                                           <span class="n">trigger</span><span 
class="o">.</span><span class="n">AfterCount</span><span 
class="p">(</span><span class="mi">20</span><span class="p">)),</span>
+            <span class="n">accumulation_mode</span><span 
class="o">=</span><span class="n">trigger</span><span class="o">.</span><span 
class="n">AccumulationMode</span><span class="o">.</span><span 
class="n">ACCUMULATING</span><span class="p">)</span>
+        <span class="c"># Extract and sum teamname/score pairs from the event 
data.</span>
+        <span class="o">|</span> <span class="s">'ExtractAndSumScore'</span> 
<span class="o">&gt;&gt;</span> <span class="n">ExtractAndSumScore</span><span 
class="p">(</span><span class="s">'team'</span><span class="p">))</span>
+</code></pre>
+</div>
 
 <p>Taken together, these processing strategies let us address the latency and 
completeness issues present in the <code 
class="highlighter-rouge">UserScore</code> and <code 
class="highlighter-rouge">HourlyTeamScore</code> pipelines, while still using 
the same basic transforms to process the data—as a matter of fact, both 
calculations still use the same <code 
class="highlighter-rouge">ExtractAndSumScore</code> transform that we used in 
both the <code class="highlighter-rouge">UserScore</co [...]
 
 <h2 id="gamestats-abuse-detection-and-usage-analysis">GameStats: Abuse 
Detection and Usage Analysis</h2>
 
-<blockquote>
-  <p><strong>Note:</strong> This example currently exists in Java only.</p>
-</blockquote>
-
 <p>While <code class="highlighter-rouge">LeaderBoard</code> demonstrates how 
to use basic windowing and triggers to perform low-latency and flexible data 
analysis, we can use more advanced windowing techniques to perform more 
comprehensive analysis. This might include some calculations designed to detect 
system abuse (like spam) or to gain insight into user behavior. The <code 
class="highlighter-rouge">GameStats</code> pipeline builds on the low-latency 
functionality in <code class="high [...]
 
 <p>Like <code class="highlighter-rouge">LeaderBoard</code>, <code 
class="highlighter-rouge">GameStats</code> reads data from an unbounded source. 
It is best thought of as an ongoing job that provides insight into the game as 
users play.</p>
 
-<blockquote>
+<blockquote class="language-java">
   <p><strong>Note:</strong> See <a 
href="https://github.com/apache/beam/blob/master/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java";>GameStats
 on GitHub</a> for the complete example pipeline program.</p>
 </blockquote>
 
+<blockquote class="language-py">
+  <p><strong>Note:</strong> See <a 
href="https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/complete/game/game_stats.py";>GameStats
 on GitHub</a> for the complete example pipeline program.</p>
+</blockquote>
+
 <h3 id="what-does-gamestats-do">What Does GameStats Do?</h3>
 
 <p>Like <code class="highlighter-rouge">LeaderBoard</code>, <code 
class="highlighter-rouge">GameStats</code> calculates the total score per team, 
per hour. However, the pipeline also performs two kinds of more complex 
analysis:</p>
@@ -800,9 +887,9 @@
     <span class="c1">// Filter the user sums using the global mean.</span>
     <span class="n">PCollection</span><span class="o">&lt;</span><span 
class="n">KV</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">Integer</span><span class="o">&gt;&gt;</span> <span 
class="n">filtered</span> <span class="o">=</span> <span 
class="n">sumScores</span>
         <span class="o">.</span><span class="na">apply</span><span 
class="o">(</span><span class="s">"ProcessAndFilter"</span><span 
class="o">,</span> <span class="n">ParDo</span>
+            <span class="c1">// use the derived mean total score as a side 
input</span>
             <span class="o">.</span><span class="na">of</span><span 
class="o">(</span><span class="k">new</span> <span class="n">DoFn</span><span 
class="o">&lt;</span><span class="n">KV</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">Integer</span><span class="o">&gt;,</span> <span 
class="n">KV</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">Integer</span><span class="o">&gt;&gt; [...]
-              <span class="kd">private</span> <span class="kd">final</span> 
<span class="n">Aggregator</span><span class="o">&lt;</span><span 
class="n">Long</span><span class="o">,</span> <span class="n">Long</span><span 
class="o">&gt;</span> <span class="n">numSpammerUsers</span> <span 
class="o">=</span>
-                <span class="n">createAggregator</span><span 
class="o">(</span><span class="s">"SpammerUsers"</span><span class="o">,</span> 
<span class="k">new</span> <span class="n">Sum</span><span 
class="o">.</span><span class="na">SumLongFn</span><span class="o">());</span>
+              <span class="kd">private</span> <span class="kd">final</span> 
<span class="n">Counter</span> <span class="n">numSpammerUsers</span> <span 
class="o">=</span> <span class="n">Metrics</span><span class="o">.</span><span 
class="na">counter</span><span class="o">(</span><span 
class="s">"main"</span><span class="o">,</span> <span 
class="s">"SpammerUsers"</span><span class="o">);</span>
               <span class="nd">@ProcessElement</span>
               <span class="kd">public</span> <span class="kt">void</span> 
<span class="nf">processElement</span><span class="o">(</span><span 
class="n">ProcessContext</span> <span class="n">c</span><span 
class="o">)</span> <span class="o">{</span>
                 <span class="n">Integer</span> <span class="n">score</span> 
<span class="o">=</span> <span class="n">c</span><span class="o">.</span><span 
class="na">element</span><span class="o">().</span><span 
class="na">getValue</span><span class="o">();</span>
@@ -810,18 +897,50 @@
                 <span class="k">if</span> <span class="o">(</span><span 
class="n">score</span> <span class="o">&gt;</span> <span 
class="o">(</span><span class="n">gmc</span> <span class="o">*</span> <span 
class="n">SCORE_WEIGHT</span><span class="o">))</span> <span class="o">{</span>
                   <span class="n">LOG</span><span class="o">.</span><span 
class="na">info</span><span class="o">(</span><span class="s">"user "</span> 
<span class="o">+</span> <span class="n">c</span><span class="o">.</span><span 
class="na">element</span><span class="o">().</span><span 
class="na">getKey</span><span class="o">()</span> <span class="o">+</span> 
<span class="s">" spammer score "</span> <span class="o">+</span> <span 
class="n">score</span>
                       <span class="o">+</span> <span class="s">" with mean 
"</span> <span class="o">+</span> <span class="n">gmc</span><span 
class="o">);</span>
-                  <span class="n">numSpammerUsers</span><span 
class="o">.</span><span class="na">addValue</span><span class="o">(</span><span 
class="mi">1L</span><span class="o">);</span>
+                  <span class="n">numSpammerUsers</span><span 
class="o">.</span><span class="na">inc</span><span class="o">();</span>
                   <span class="n">c</span><span class="o">.</span><span 
class="na">output</span><span class="o">(</span><span class="n">c</span><span 
class="o">.</span><span class="na">element</span><span class="o">());</span>
                 <span class="o">}</span>
               <span class="o">}</span>
-            <span class="o">})</span>
-            <span class="c1">// use the derived mean total score as a side 
input</span>
-            <span class="o">.</span><span 
class="na">withSideInputs</span><span class="o">(</span><span 
class="n">globalMeanScore</span><span class="o">));</span>
+            <span class="o">}).</span><span 
class="na">withSideInputs</span><span class="o">(</span><span 
class="n">globalMeanScore</span><span class="o">));</span>
     <span class="k">return</span> <span class="n">filtered</span><span 
class="o">;</span>
   <span class="o">}</span>
 <span class="o">}</span>
 </code></pre>
 </div>
+<div class="language-py highlighter-rouge"><pre class="highlight"><code><span 
class="k">class</span> <span class="nc">CalculateSpammyUsers</span><span 
class="p">(</span><span class="n">beam</span><span class="o">.</span><span 
class="n">PTransform</span><span class="p">):</span>
+  <span class="s">"""Filter out all but those users with a high clickrate, 
which we will
+  consider as 'spammy' uesrs.
+
+  We do this by finding the mean total score per user, then using that
+  information as a side input to filter out all but those user scores that are
+  larger than (mean * SCORE_WEIGHT).
+  """</span>
+  <span class="n">SCORE_WEIGHT</span> <span class="o">=</span> <span 
class="mf">2.5</span>
+
+  <span class="k">def</span> <span class="nf">expand</span><span 
class="p">(</span><span class="bp">self</span><span class="p">,</span> <span 
class="n">user_scores</span><span class="p">):</span>
+    <span class="c"># Get the sum of scores for each user.</span>
+    <span class="n">sum_scores</span> <span class="o">=</span> <span 
class="p">(</span>
+        <span class="n">user_scores</span>
+        <span class="o">|</span> <span class="s">'SumUsersScores'</span> <span 
class="o">&gt;&gt;</span> <span class="n">beam</span><span 
class="o">.</span><span class="n">CombinePerKey</span><span 
class="p">(</span><span class="nb">sum</span><span class="p">))</span>
+
+    <span class="c"># Extract the score from each element, and use it to find 
the global mean.</span>
+    <span class="n">global_mean_score</span> <span class="o">=</span> <span 
class="p">(</span>
+        <span class="n">sum_scores</span>
+        <span class="o">|</span> <span class="n">beam</span><span 
class="o">.</span><span class="n">Values</span><span class="p">()</span>
+        <span class="o">|</span> <span class="n">beam</span><span 
class="o">.</span><span class="n">CombineGlobally</span><span 
class="p">(</span><span class="n">beam</span><span class="o">.</span><span 
class="n">combiners</span><span class="o">.</span><span 
class="n">MeanCombineFn</span><span class="p">())</span>\
+            <span class="o">.</span><span 
class="n">as_singleton_view</span><span class="p">())</span>
+
+    <span class="c"># Filter the user sums using the global mean.</span>
+    <span class="n">filtered</span> <span class="o">=</span> <span 
class="p">(</span>
+        <span class="n">sum_scores</span>
+        <span class="c"># Use the derived mean total score (global_mean_score) 
as a side input.</span>
+        <span class="o">|</span> <span class="s">'ProcessAndFilter'</span> 
<span class="o">&gt;&gt;</span> <span class="n">beam</span><span 
class="o">.</span><span class="n">Filter</span><span class="p">(</span>
+            <span class="k">lambda</span> <span class="p">(</span><span 
class="n">_</span><span class="p">,</span> <span class="n">score</span><span 
class="p">),</span> <span class="n">global_mean</span><span class="p">:</span>\
+                <span class="n">score</span> <span class="o">&gt;</span> <span 
class="n">global_mean</span> <span class="o">*</span> <span 
class="bp">self</span><span class="o">.</span><span 
class="n">SCORE_WEIGHT</span><span class="p">,</span>
+            <span class="n">global_mean_score</span><span class="p">))</span>
+    <span class="k">return</span> <span class="n">filtered</span>
+</code></pre>
+</div>
 
 <p>The abuse-detection transform generates a view of users supected to be 
spambots. Later in the pipeline, we use that view to filter out any such users 
when we calculate the team score per hour, again by using the side input 
mechanism. The following code example shows where we insert the spam filter, 
between windowing the scores into fixed windows and extracting the team 
scores:</p>
 
@@ -842,12 +961,27 @@
                 <span class="n">c</span><span class="o">.</span><span 
class="na">output</span><span class="o">(</span><span class="n">c</span><span 
class="o">.</span><span class="na">element</span><span class="o">());</span>
               <span class="o">}</span>
             <span class="o">}</span>
-          <span class="o">})</span>
-          <span class="o">.</span><span class="na">withSideInputs</span><span 
class="o">(</span><span class="n">spammersView</span><span class="o">))</span>
-  <span class="c1">// Extract and sum teamname/score pairs from the event 
data.</span>
+          <span class="o">}).</span><span 
class="na">withSideInputs</span><span class="o">(</span><span 
class="n">spammersView</span><span class="o">))</span>
+    <span class="c1">// Extract and sum teamname/score pairs from the event 
data.</span>
   <span class="o">.</span><span class="na">apply</span><span 
class="o">(</span><span class="s">"ExtractTeamScore"</span><span 
class="o">,</span> <span class="k">new</span> <span 
class="n">ExtractAndSumScore</span><span class="o">(</span><span 
class="s">"team"</span><span class="o">))</span>
 </code></pre>
 </div>
+<div class="language-py highlighter-rouge"><pre class="highlight"><code><span 
class="c"># Calculate the total score per team over fixed windows, and emit 
cumulative</span>
+<span class="c"># updates for late data. Uses the side input derived above 
--the set of</span>
+<span class="c"># suspected robots-- to filter out scores from those users 
from the sum.</span>
+<span class="c"># Write the results to BigQuery.</span>
+<span class="p">(</span><span class="n">raw_events</span>  <span class="c"># 
pylint: disable=expression-not-assigned</span>
+ <span class="o">|</span> <span class="s">'WindowIntoFixedWindows'</span> 
<span class="o">&gt;&gt;</span> <span class="n">beam</span><span 
class="o">.</span><span class="n">WindowInto</span><span class="p">(</span>
+     <span class="n">beam</span><span class="o">.</span><span 
class="n">window</span><span class="o">.</span><span 
class="n">FixedWindows</span><span class="p">(</span><span 
class="n">fixed_window_duration</span><span class="p">))</span>
+
+ <span class="c"># Filter out the detected spammer users, using the side input 
derived above</span>
+ <span class="o">|</span> <span class="s">'FilterOutSpammers'</span> <span 
class="o">&gt;&gt;</span> <span class="n">beam</span><span 
class="o">.</span><span class="n">Filter</span><span class="p">(</span>
+     <span class="k">lambda</span> <span class="n">elem</span><span 
class="p">,</span> <span class="n">spammers</span><span class="p">:</span> 
<span class="n">elem</span><span class="p">[</span><span 
class="s">'user'</span><span class="p">]</span> <span class="ow">not</span> 
<span class="ow">in</span> <span class="n">spammers</span><span 
class="p">,</span>
+     <span class="n">spammers_view</span><span class="p">)</span>
+ <span class="c"># Extract and sum teamname/score pairs from the event 
data.</span>
+ <span class="o">|</span> <span class="s">'ExtractAndSumScore'</span> <span 
class="o">&gt;&gt;</span> <span class="n">ExtractAndSumScore</span><span 
class="p">(</span><span class="s">'team'</span><span class="p">)</span>
+</code></pre>
+</div>
 
 <h4 id="analyzing-usage-patterns">Analyzing Usage Patterns</h4>
 
@@ -860,7 +994,7 @@
 <figure id="fig6">
     <img src="/images/gaming-example-session-windows.png" width="662" 
height="521" alt="User sessions, with a minimum gap duration." />
 </figure>
-<p>Figure 6: User sessions, with a minimum gap duration. Note how each user 
has different sessions, according to how many instances they play and how long 
their breaks between instances are.</p>
+<p><strong>Figure 6:</strong> User sessions, with a minimum gap duration. Note 
how each user has different sessions, according to how many instances they play 
and how long their breaks between instances are.</p>
 
 <p>We can use the session-windowed data to determine the average length of 
uninterrupted play time for all of our users, as well as the total score they 
achieve during each session. We can do this in the code by first applying 
session windows, summing the score per user and session, and then using a 
transform to calculate the length of each individual session:</p>
 
@@ -871,7 +1005,7 @@
 <span class="n">userEvents</span>
   <span class="o">.</span><span class="na">apply</span><span 
class="o">(</span><span class="s">"WindowIntoSessions"</span><span 
class="o">,</span> <span class="n">Window</span><span 
class="o">.&lt;</span><span class="n">KV</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">Integer</span><span class="o">&gt;&gt;</span><span 
class="n">into</span><span class="o">(</span>
       <span class="n">Sessions</span><span class="o">.</span><span 
class="na">withGapDuration</span><span class="o">(</span><span 
class="n">Duration</span><span class="o">.</span><span 
class="na">standardMinutes</span><span class="o">(</span><span 
class="n">options</span><span class="o">.</span><span 
class="na">getSessionGap</span><span class="o">())))</span>
-      <span class="o">.</span><span class="na">withOutputTimeFn</span><span 
class="o">(</span><span class="n">OutputTimeFns</span><span 
class="o">.</span><span class="na">outputAtEndOfWindow</span><span 
class="o">()))</span>
+      <span class="o">.</span><span 
class="na">withTimestampCombiner</span><span class="o">(</span><span 
class="n">TimestampCombiner</span><span class="o">.</span><span 
class="na">END_OF_WINDOW</span><span class="o">))</span>
   <span class="c1">// For this use, we care only about the existence of the 
session, not any particular</span>
   <span class="c1">// information aggregated over it, so the following is an 
efficient way to do that.</span>
   <span class="o">.</span><span class="na">apply</span><span 
class="o">(</span><span class="n">Combine</span><span class="o">.</span><span 
class="na">perKey</span><span class="o">(</span><span class="n">x</span> <span 
class="o">-&gt;</span> <span class="mi">0</span><span class="o">))</span>
@@ -879,6 +1013,24 @@
   <span class="o">.</span><span class="na">apply</span><span 
class="o">(</span><span class="s">"UserSessionActivity"</span><span 
class="o">,</span> <span class="n">ParDo</span><span class="o">.</span><span 
class="na">of</span><span class="o">(</span><span class="k">new</span> <span 
class="n">UserSessionInfoFn</span><span class="o">()))</span>
 </code></pre>
 </div>
+<div class="language-py highlighter-rouge"><pre class="highlight"><code><span 
class="c"># Detect user sessions-- that is, a burst of activity separated by a 
gap</span>
+<span class="c"># from further activity. Find and record the mean session 
lengths.</span>
+<span class="c"># This information could help the game designers track the 
changing user</span>
+<span class="c"># engagement as their set of game changes.</span>
+<span class="p">(</span><span class="n">user_events</span>  <span class="c"># 
pylint: disable=expression-not-assigned</span>
+ <span class="o">|</span> <span class="s">'WindowIntoSessions'</span> <span 
class="o">&gt;&gt;</span> <span class="n">beam</span><span 
class="o">.</span><span class="n">WindowInto</span><span class="p">(</span>
+     <span class="n">beam</span><span class="o">.</span><span 
class="n">window</span><span class="o">.</span><span 
class="n">Sessions</span><span class="p">(</span><span 
class="n">session_gap</span><span class="p">),</span>
+     <span class="n">timestamp_combiner</span><span class="o">=</span><span 
class="n">beam</span><span class="o">.</span><span class="n">window</span><span 
class="o">.</span><span class="n">TimestampCombiner</span><span 
class="o">.</span><span class="n">OUTPUT_AT_EOW</span><span class="p">)</span>
+
+ <span class="c"># For this use, we care only about the existence of the 
session, not any</span>
+ <span class="c"># particular information aggregated over it, so we can just 
group by key</span>
+ <span class="c"># and assign a "dummy value" of None.</span>
+ <span class="o">|</span> <span class="n">beam</span><span 
class="o">.</span><span class="n">CombinePerKey</span><span 
class="p">(</span><span class="k">lambda</span> <span class="n">_</span><span 
class="p">:</span> <span class="bp">None</span><span class="p">)</span>
+
+ <span class="c"># Get the duration of the session</span>
+ <span class="o">|</span> <span class="s">'UserSessionActivity'</span> <span 
class="o">&gt;&gt;</span> <span class="n">beam</span><span 
class="o">.</span><span class="n">ParDo</span><span class="p">(</span><span 
class="n">UserSessionActivity</span><span class="p">())</span>
+</code></pre>
+</div>
 
 <p>This gives us a set of user sessions, each with an attached duration. We 
can then calculate the <em>average</em> session length by re-windowing the data 
into fixed time windows, and then calculating the average for all sessions that 
end in each hour:</p>
 
@@ -890,7 +1042,24 @@
 <span class="c1">// Write this info to a BigQuery table.</span>
 <span class="o">.</span><span class="na">apply</span><span 
class="o">(</span><span class="s">"WriteAvgSessionLength"</span><span 
class="o">,</span>
        <span class="k">new</span> <span 
class="n">WriteWindowedToBigQuery</span><span class="o">&lt;</span><span 
class="n">Double</span><span class="o">&gt;(</span>
-          <span class="n">options</span><span class="o">.</span><span 
class="na">getTablePrefix</span><span class="o">()</span> <span 
class="o">+</span> <span class="s">"_sessions"</span><span class="o">,</span> 
<span class="n">configureSessionWindowWrite</span><span class="o">()));</span>
+           <span class="n">options</span><span class="o">.</span><span 
class="na">as</span><span class="o">(</span><span 
class="n">GcpOptions</span><span class="o">.</span><span 
class="na">class</span><span class="o">).</span><span 
class="na">getProject</span><span class="o">(),</span>
+           <span class="n">options</span><span class="o">.</span><span 
class="na">getDataset</span><span class="o">(),</span>
+           <span class="n">options</span><span class="o">.</span><span 
class="na">getGameStatsTablePrefix</span><span class="o">()</span> <span 
class="o">+</span> <span class="s">"_sessions"</span><span class="o">,</span> 
<span class="n">configureSessionWindowWrite</span><span class="o">()));</span>
+</code></pre>
+</div>
+<div class="language-py highlighter-rouge"><pre class="highlight"><code><span 
class="c"># Re-window to process groups of session sums according to when 
the</span>
+<span class="c"># sessions complete</span>
+<span class="o">|</span> <span class="s">'WindowToExtractSessionMean'</span> 
<span class="o">&gt;&gt;</span> <span class="n">beam</span><span 
class="o">.</span><span class="n">WindowInto</span><span class="p">(</span>
+    <span class="n">beam</span><span class="o">.</span><span 
class="n">window</span><span class="o">.</span><span 
class="n">FixedWindows</span><span class="p">(</span><span 
class="n">user_activity_window_duration</span><span class="p">))</span>
+
+<span class="c"># Find the mean session duration in each window</span>
+<span class="o">|</span> <span class="n">beam</span><span 
class="o">.</span><span class="n">CombineGlobally</span><span 
class="p">(</span><span class="n">beam</span><span class="o">.</span><span 
class="n">combiners</span><span class="o">.</span><span 
class="n">MeanCombineFn</span><span class="p">())</span><span 
class="o">.</span><span class="n">without_defaults</span><span 
class="p">()</span>
+<span class="o">|</span> <span class="s">'FormatAvgSessionLength'</span> <span 
class="o">&gt;&gt;</span> <span class="n">beam</span><span 
class="o">.</span><span class="n">Map</span><span class="p">(</span>
+    <span class="k">lambda</span> <span class="n">elem</span><span 
class="p">:</span> <span class="p">{</span><span 
class="s">'mean_duration'</span><span class="p">:</span> <span 
class="nb">float</span><span class="p">(</span><span class="n">elem</span><span 
class="p">)})</span>
+<span class="o">|</span> <span class="s">'WriteAvgSessionLength'</span> <span 
class="o">&gt;&gt;</span> <span class="n">WriteToBigQuery</span><span 
class="p">(</span>
+    <span class="n">args</span><span class="o">.</span><span 
class="n">table_name</span> <span class="o">+</span> <span 
class="s">'_sessions'</span><span class="p">,</span> <span 
class="n">args</span><span class="o">.</span><span 
class="n">dataset</span><span class="p">,</span> <span class="p">{</span>
+        <span class="s">'mean_duration'</span><span class="p">:</span> <span 
class="s">'FLOAT'</span><span class="p">,</span>
+    <span class="p">}))</span>
 </code></pre>
 </div>
 

-- 
To stop receiving notification emails like this one, please contact
"[email protected]" <[email protected]>.

Reply via email to