Regenerate website

Project: http://git-wip-us.apache.org/repos/asf/beam-site/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam-site/commit/b76eb56b
Tree: http://git-wip-us.apache.org/repos/asf/beam-site/tree/b76eb56b
Diff: http://git-wip-us.apache.org/repos/asf/beam-site/diff/b76eb56b

Branch: refs/heads/asf-site
Commit: b76eb56b6986f99aa73ce38f8e719803ab18fb4e
Parents: bf62dc9
Author: Ahmet Altay <al...@google.com>
Authored: Tue May 2 18:26:45 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Tue May 2 18:26:45 2017 -0700

----------------------------------------------------------------------
 .../mobile-gaming-example/index.html            | 161 ++++++++++++++++++-
 1 file changed, 157 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam-site/blob/b76eb56b/content/get-started/mobile-gaming-example/index.html
----------------------------------------------------------------------
diff --git a/content/get-started/mobile-gaming-example/index.html 
b/content/get-started/mobile-gaming-example/index.html
index 054024c..7a3fbb4 100644
--- a/content/get-started/mobile-gaming-example/index.html
+++ b/content/get-started/mobile-gaming-example/index.html
@@ -188,12 +188,24 @@
   </li>
 </ul>
 
+<nav class="language-switcher">
+  <strong>Adapt for:</strong> 
+  <ul>
+    <li data-type="language-java">Java SDK</li>
+    <li data-type="language-py">Python SDK</li>
+  </ul>
+</nav>
+
 <p>This section provides a walkthrough of a series of example Apache Beam 
pipelines that demonstrate more complex functionality than the basic <a 
href="/get-started/wordcount-example">WordCount</a> examples. The pipelines in 
this section process data from a hypothetical game that users play on their 
mobile phones. The pipelines demonstrate processing at increasing levels of 
complexity; the first pipeline, for example, shows how to run a batch analysis 
job to obtain relatively simple score data, while the later pipelines use 
Beam’s windowing and triggers features to provide low-latency data analysis 
and more complex intelligence about user’s play patterns.</p>
 
-<blockquote>
+<blockquote class="language-java">
   <p><strong>Note</strong>: These examples assume some familiarity with the 
Beam programming model. If you haven’t already, we recommend familiarizing 
yourself with the programming model documentation and running a basic example 
pipeline before continuing. Note also that these examples use the Java 8 lambda 
syntax, and thus require Java 8. However, you can create pipelines with 
equivalent functionality using Java 7.</p>
 </blockquote>
 
+<blockquote class="language-py">
+  <p><strong>Note</strong>: These examples assume some familiarity with the 
Beam programming model. If you haven’t already, we recommend familiarizing 
yourself with the programming model documentation and running a basic example 
pipeline before continuing.</p>
+</blockquote>
+
 <p>Every time a user plays an instance of our hypothetical mobile game, they 
generate a data event. Each data event consists of the following 
information:</p>
 
 <ul>
@@ -224,10 +236,14 @@
 
 <p>The <code class="highlighter-rouge">UserScore</code> pipeline is the 
simplest example for processing mobile game data. <code 
class="highlighter-rouge">UserScore</code> determines the total score per user 
over a finite data set (for example, one day’s worth of scores stored on the 
game server). Pipelines like <code class="highlighter-rouge">UserScore</code> 
are best run periodically after all relevant data has been gathered. For 
example, <code class="highlighter-rouge">UserScore</code> could run as a 
nightly job over data gathered during that day.</p>
 
-<blockquote>
+<blockquote class="language-java">
   <p><strong>Note:</strong> See <a 
href="https://github.com/apache/beam/blob/master/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java";>UserScore
 on GitHub</a> for the complete example pipeline program.</p>
 </blockquote>
 
+<blockquote class="language-py">
+  <p><strong>Note:</strong> See <a 
href="https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/complete/game/user_score.py";>UserScore
 on GitHub</a> for the complete example pipeline program.</p>
+</blockquote>
+
 <h3 id="what-does-userscore-do">What Does UserScore Do?</h3>
 
 <p>In a day’s worth of scoring data, each user ID may have multiple records 
(if the user plays more than one instance of the game during the analysis 
window), each with their own score value and timestamp. If we want to determine 
the total score over all the instances a user plays during the day, our 
pipeline will need to group all the records together per individual user.</p>
@@ -283,6 +299,28 @@
 </code></pre>
 </div>
 
+<div class="language-py highlighter-rouge"><pre class="highlight"><code><span 
class="k">class</span> <span class="nc">ExtractAndSumScore</span><span 
class="p">(</span><span class="n">beam</span><span class="o">.</span><span 
class="n">PTransform</span><span class="p">):</span>
+  <span class="s">"""A transform to extract key/score information and sum the 
scores.
+  The constructor argument `field` determines whether 'team' or 'user' info is
+  extracted.
+  """</span>
+  <span class="k">def</span> <span class="nf">__init__</span><span 
class="p">(</span><span class="bp">self</span><span class="p">,</span> <span 
class="n">field</span><span class="p">):</span>
+    <span class="nb">super</span><span class="p">(</span><span 
class="n">ExtractAndSumScore</span><span class="p">,</span> <span 
class="bp">self</span><span class="p">)</span><span class="o">.</span><span 
class="n">__init__</span><span class="p">()</span>
+    <span class="bp">self</span><span class="o">.</span><span 
class="n">field</span> <span class="o">=</span> <span class="n">field</span>
+
+  <span class="k">def</span> <span class="nf">expand</span><span 
class="p">(</span><span class="bp">self</span><span class="p">,</span> <span 
class="n">pcoll</span><span class="p">):</span>
+    <span class="k">return</span> <span class="p">(</span><span 
class="n">pcoll</span>
+            <span class="o">|</span> <span class="n">beam</span><span 
class="o">.</span><span class="n">Map</span><span class="p">(</span><span 
class="k">lambda</span> <span class="n">info</span><span class="p">:</span> 
<span class="p">(</span><span class="n">info</span><span 
class="p">[</span><span class="bp">self</span><span class="o">.</span><span 
class="n">field</span><span class="p">],</span> <span 
class="n">info</span><span class="p">[</span><span 
class="s">'score'</span><span class="p">]))</span>
+            <span class="o">|</span> <span class="n">beam</span><span 
class="o">.</span><span class="n">CombinePerKey</span><span 
class="p">(</span><span class="n">sum_ints</span><span class="p">))</span>
+
+<span class="k">def</span> <span 
class="nf">configure_bigquery_write</span><span class="p">():</span>
+  <span class="k">return</span> <span class="p">[</span>
+      <span class="p">(</span><span class="s">'user'</span><span 
class="p">,</span> <span class="s">'STRING'</span><span class="p">,</span> 
<span class="k">lambda</span> <span class="n">e</span><span class="p">:</span> 
<span class="n">e</span><span class="p">[</span><span class="mi">0</span><span 
class="p">]),</span>
+      <span class="p">(</span><span class="s">'total_score'</span><span 
class="p">,</span> <span class="s">'INTEGER'</span><span class="p">,</span> 
<span class="k">lambda</span> <span class="n">e</span><span class="p">:</span> 
<span class="n">e</span><span class="p">[</span><span class="mi">1</span><span 
class="p">]),</span>
+  <span class="p">]</span>
+</code></pre>
+</div>
+
 <p><code class="highlighter-rouge">ExtractAndSumScore</code> is written to be 
more general, in that you can pass in the field by which you want to group the 
data (in the case of our game, by unique user or unique team). This means we 
can re-use <code class="highlighter-rouge">ExtractAndSumScore</code> in other 
pipelines that group score data by team, for example.</p>
 
 <p>Here’s the main method of <code 
class="highlighter-rouge">UserScore</code>, showing how we apply all three 
steps of the pipeline:</p>
@@ -307,6 +345,25 @@
 </code></pre>
 </div>
 
+<div class="language-py highlighter-rouge"><pre class="highlight"><code><span 
class="k">def</span> <span class="nf">run</span><span class="p">(</span><span 
class="n">argv</span><span class="o">=</span><span class="bp">None</span><span 
class="p">):</span>
+  <span class="s">"""Main entry point; defines and runs the user_score 
pipeline."""</span>
+  
+  <span class="o">...</span>
+
+  <span class="n">pipeline_options</span> <span class="o">=</span> <span 
class="n">PipelineOptions</span><span class="p">(</span><span 
class="n">pipeline_args</span><span class="p">)</span>
+  <span class="n">p</span> <span class="o">=</span> <span 
class="n">beam</span><span class="o">.</span><span 
class="n">Pipeline</span><span class="p">(</span><span 
class="n">options</span><span class="o">=</span><span 
class="n">pipeline_options</span><span class="p">)</span>
+
+  <span class="p">(</span><span class="n">p</span>  <span class="c"># pylint: 
disable=expression-not-assigned</span>
+   <span class="o">|</span> <span class="n">ReadFromText</span><span 
class="p">(</span><span class="n">known_args</span><span 
class="o">.</span><span class="nb">input</span><span class="p">)</span> <span 
class="c"># Read events from a file and parse them.</span>
+   <span class="o">|</span> <span class="n">UserScore</span><span 
class="p">()</span>
+   <span class="o">|</span> <span class="n">WriteToBigQuery</span><span 
class="p">(</span>
+       <span class="n">known_args</span><span class="o">.</span><span 
class="n">table_name</span><span class="p">,</span> <span 
class="n">known_args</span><span class="o">.</span><span 
class="n">dataset</span><span class="p">,</span> <span 
class="n">configure_bigquery_write</span><span class="p">()))</span>
+
+  <span class="n">result</span> <span class="o">=</span> <span 
class="n">p</span><span class="o">.</span><span class="n">run</span><span 
class="p">()</span>
+  <span class="n">result</span><span class="o">.</span><span 
class="n">wait_until_finish</span><span class="p">()</span>
+</code></pre>
+</div>
+
 <h3 id="working-with-the-results">Working with the Results</h3>
 
 <p><code class="highlighter-rouge">UserScore</code> writes the data to a 
BigQuery table (called <code class="highlighter-rouge">user_score</code> by 
default). With the data in the BigQuery table, we might perform a further 
interactive analysis, such as querying for a list of the N top-scoring users 
for a given day.</p>
@@ -344,10 +401,14 @@
 
 <p>Like <code class="highlighter-rouge">UserScore</code>, <code 
class="highlighter-rouge">HourlyTeamScore</code> is best thought of as a job to 
be run periodically after all the relevant data has been gathered (such as once 
per day). The pipeline reads a fixed data set from a file, and writes the 
results to a Google Cloud BigQuery table, just like <code 
class="highlighter-rouge">UserScore</code>.</p>
 
-<blockquote>
+<blockquote class="language-java">
   <p><strong>Note:</strong> See <a 
href="https://github.com/apache/beam/blob/master/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java";>HourlyTeamScore
 on GitHub</a> for the complete example pipeline program.</p>
 </blockquote>
 
+<blockquote class="language-py">
+  <p><strong>Note:</strong> See <a 
href="https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/complete/game/hourly_team_score.py";>HourlyTeamScore
 on GitHub</a> for the complete example pipeline program.</p>
+</blockquote>
+
 <h3 id="what-does-hourlyteamscore-do">What Does HourlyTeamScore Do?</h3>
 
 <p><code class="highlighter-rouge">HourlyTeamScore</code> calculates the total 
score per team, per hour, in a fixed data set (such as one day’s worth of 
data).</p>
@@ -382,7 +443,11 @@
 
 <p>Beam’s windowing feature uses the <a 
href="/documentation/programming-guide/#pctimestamps">intrinsic timestamp 
information</a> attached to each element of a <code 
class="highlighter-rouge">PCollection</code>. Because we want our pipeline to 
window based on <em>event time</em>, we <strong>must first extract the 
timestamp</strong> that’s embedded in each data record apply it to the 
corresponding element in the <code class="highlighter-rouge">PCollection</code> 
of score data. Then, the pipeline can <strong>apply the windowing 
function</strong> to divide the <code 
class="highlighter-rouge">PCollection</code> into logical windows.</p>
 
-<p>Here’s the code, which shows how <code 
class="highlighter-rouge">HourlyTeamScore</code> uses the <a 
href="https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java";>WithTimestamps</a>
 and <a 
href="https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java";>Window</a>
 transforms to perform these operations:</p>
+<p class="language-java"><code 
class="highlighter-rouge">HourlyTeamScore</code> uses the <a 
href="https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java";>WithTimestamps</a>
 and <a 
href="https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java";>Window</a>
 transforms to perform these operations.</p>
+
+<p class="language-py"><code class="highlighter-rouge">HourlyTeamScore</code> 
uses the <code class="highlighter-rouge">FixedWindows</code> transform, found 
in <a 
href="https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/window.py";>window.py</a>,
 to perform these operations.</p>
+
+<p>The following code shows this:</p>
 
 <div class="language-java highlighter-rouge"><pre 
class="highlight"><code><span class="c1">// Add an element timestamp based on 
the event log, and apply fixed windowing.</span>
     <span class="o">.</span><span class="na">apply</span><span 
class="o">(</span><span class="s">"AddEventTimestamps"</span><span 
class="o">,</span>
@@ -392,6 +457,17 @@
 </code></pre>
 </div>
 
+<div class="language-py highlighter-rouge"><pre class="highlight"><code><span 
class="c"># Add an element timestamp based on the event log, and apply fixed 
windowing.</span>
+<span class="c"># Convert element['timestamp'] into seconds as expected by 
TimestampedValue.</span>
+<span class="o">|</span> <span class="s">'AddEventTimestamps'</span> <span 
class="o">&gt;&gt;</span> <span class="n">beam</span><span 
class="o">.</span><span class="n">Map</span><span class="p">(</span>
+    <span class="k">lambda</span> <span class="n">element</span><span 
class="p">:</span> <span class="n">TimestampedValue</span><span 
class="p">(</span>
+        <span class="n">element</span><span class="p">,</span> <span 
class="n">element</span><span class="p">[</span><span 
class="s">'timestamp'</span><span class="p">]</span> <span class="o">/</span> 
<span class="mf">1000.0</span><span class="p">))</span>
+<span class="c"># Convert window_duration into seconds as expected by 
FixedWindows.</span>
+<span class="o">|</span> <span class="s">'FixedWindowsTeam'</span> <span 
class="o">&gt;&gt;</span> <span class="n">beam</span><span 
class="o">.</span><span class="n">WindowInto</span><span 
class="p">(</span><span class="n">FixedWindows</span><span class="p">(</span>
+    <span class="n">size</span><span class="o">=</span><span 
class="bp">self</span><span class="o">.</span><span 
class="n">window_duration</span> <span class="o">*</span> <span 
class="mi">60</span><span class="p">))</span>
+</code></pre>
+</div>
+
 <p>Notice that the transforms the pipeline uses to specify the windowing are 
distinct from the actual data processing transforms (such as <code 
class="highlighter-rouge">ExtractAndSumScores</code>). This functionality 
provides you some flexibility in designing your Beam pipeline, in that you can 
run existing transforms over datasets with different windowing 
characteristics.</p>
 
 <h4 id="filtering-based-on-event-time">Filtering Based On Event Time</h4>
@@ -413,6 +489,13 @@
 </code></pre>
 </div>
 
+<div class="language-py highlighter-rouge"><pre class="highlight"><code><span 
class="o">|</span> <span class="s">'FilterStartTime'</span> <span 
class="o">&gt;&gt;</span> <span class="n">beam</span><span 
class="o">.</span><span class="n">Filter</span><span class="p">(</span>
+    <span class="k">lambda</span> <span class="n">element</span><span 
class="p">:</span> <span class="n">element</span><span class="p">[</span><span 
class="s">'timestamp'</span><span class="p">]</span> <span 
class="o">&gt;</span> <span class="n">start_min_filter</span><span 
class="p">)</span>
+<span class="o">|</span> <span class="s">'FilterEndTime'</span> <span 
class="o">&gt;&gt;</span> <span class="n">beam</span><span 
class="o">.</span><span class="n">Filter</span><span class="p">(</span>
+    <span class="k">lambda</span> <span class="n">element</span><span 
class="p">:</span> <span class="n">element</span><span class="p">[</span><span 
class="s">'timestamp'</span><span class="p">]</span> <span 
class="o">&lt;</span> <span class="n">end_min_filter</span><span 
class="p">)</span>
+</code></pre>
+</div>
+
 <h4 id="calculating-score-per-team-per-window">Calculating Score Per Team, Per 
Window</h4>
 
 <p><code class="highlighter-rouge">HourlyTeamScore</code> uses the same <code 
class="highlighter-rouge">ExtractAndSumScores</code> transform as the <code 
class="highlighter-rouge">UserScore</code> pipeline, but passes a different key 
(team, as opposed to user). Also, because the pipeline applies <code 
class="highlighter-rouge">ExtractAndSumScores</code> <em>after</em> applying 
fixed-time 1-hour windowing to the input data, the data gets grouped by both 
team <em>and</em> window. You can see the full sequence of transforms in <code 
class="highlighter-rouge">HourlyTeamScore</code>’s main method:</p>
@@ -464,6 +547,68 @@
 </code></pre>
 </div>
 
+<div class="language-py highlighter-rouge"><pre class="highlight"><code><span 
class="k">class</span> <span class="nc">HourlyTeamScore</span><span 
class="p">(</span><span class="n">beam</span><span class="o">.</span><span 
class="n">PTransform</span><span class="p">):</span>
+  <span class="k">def</span> <span class="nf">__init__</span><span 
class="p">(</span><span class="bp">self</span><span class="p">,</span> <span 
class="n">start_min</span><span class="p">,</span> <span 
class="n">stop_min</span><span class="p">,</span> <span 
class="n">window_duration</span><span class="p">):</span>
+    <span class="nb">super</span><span class="p">(</span><span 
class="n">HourlyTeamScore</span><span class="p">,</span> <span 
class="bp">self</span><span class="p">)</span><span class="o">.</span><span 
class="n">__init__</span><span class="p">()</span>
+    <span class="bp">self</span><span class="o">.</span><span 
class="n">start_min</span> <span class="o">=</span> <span 
class="n">start_min</span>
+    <span class="bp">self</span><span class="o">.</span><span 
class="n">stop_min</span> <span class="o">=</span> <span 
class="n">stop_min</span>
+    <span class="bp">self</span><span class="o">.</span><span 
class="n">window_duration</span> <span class="o">=</span> <span 
class="n">window_duration</span>
+
+  <span class="k">def</span> <span class="nf">expand</span><span 
class="p">(</span><span class="bp">self</span><span class="p">,</span> <span 
class="n">pcoll</span><span class="p">):</span>
+    <span class="n">start_min_filter</span> <span class="o">=</span> <span 
class="n">string_to_timestamp</span><span class="p">(</span><span 
class="bp">self</span><span class="o">.</span><span 
class="n">start_min</span><span class="p">)</span>
+    <span class="n">end_min_filter</span> <span class="o">=</span> <span 
class="n">string_to_timestamp</span><span class="p">(</span><span 
class="bp">self</span><span class="o">.</span><span 
class="n">stop_min</span><span class="p">)</span>
+
+    <span class="k">return</span> <span class="p">(</span>
+        <span class="n">pcoll</span>
+        <span class="o">|</span> <span class="s">'ParseGameEvent'</span> <span 
class="o">&gt;&gt;</span> <span class="n">beam</span><span 
class="o">.</span><span class="n">ParDo</span><span class="p">(</span><span 
class="n">ParseEventFn</span><span class="p">())</span>
+        <span class="c"># Filter out data before and after the given times so 
that it is not</span>
+        <span class="c"># included in the calculations. As we collect data in 
batches (say, by</span>
+        <span class="c"># day), the batch for the day that we want to analyze 
could potentially</span>
+        <span class="c"># include some late-arriving data from the previous 
day. If so, we want</span>
+        <span class="c"># to weed it out. Similarly, if we include data from 
the following day</span>
+        <span class="c"># (to scoop up late-arriving events from the day we're 
analyzing), we</span>
+        <span class="c"># need to weed out events that fall after the time 
period we want to</span>
+        <span class="c"># analyze.</span>
+        <span class="o">|</span> <span class="s">'FilterStartTime'</span> 
<span class="o">&gt;&gt;</span> <span class="n">beam</span><span 
class="o">.</span><span class="n">Filter</span><span class="p">(</span>
+            <span class="k">lambda</span> <span class="n">element</span><span 
class="p">:</span> <span class="n">element</span><span class="p">[</span><span 
class="s">'timestamp'</span><span class="p">]</span> <span 
class="o">&gt;</span> <span class="n">start_min_filter</span><span 
class="p">)</span>
+        <span class="o">|</span> <span class="s">'FilterEndTime'</span> <span 
class="o">&gt;&gt;</span> <span class="n">beam</span><span 
class="o">.</span><span class="n">Filter</span><span class="p">(</span>
+            <span class="k">lambda</span> <span class="n">element</span><span 
class="p">:</span> <span class="n">element</span><span class="p">[</span><span 
class="s">'timestamp'</span><span class="p">]</span> <span 
class="o">&lt;</span> <span class="n">end_min_filter</span><span 
class="p">)</span>
+        <span class="c"># Add an element timestamp based on the event log, and 
apply fixed</span>
+        <span class="c"># windowing.</span>
+        <span class="c"># Convert element['timestamp'] into seconds as 
expected by</span>
+        <span class="c"># TimestampedValue.</span>
+        <span class="o">|</span> <span class="s">'AddEventTimestamps'</span> 
<span class="o">&gt;&gt;</span> <span class="n">beam</span><span 
class="o">.</span><span class="n">Map</span><span class="p">(</span>
+            <span class="k">lambda</span> <span class="n">element</span><span 
class="p">:</span> <span class="n">TimestampedValue</span><span 
class="p">(</span>
+                <span class="n">element</span><span class="p">,</span> <span 
class="n">element</span><span class="p">[</span><span 
class="s">'timestamp'</span><span class="p">]</span> <span class="o">/</span> 
<span class="mf">1000.0</span><span class="p">))</span>
+        <span class="c"># Convert window_duration into seconds as expected by 
FixedWindows.</span>
+        <span class="o">|</span> <span class="s">'FixedWindowsTeam'</span> 
<span class="o">&gt;&gt;</span> <span class="n">beam</span><span 
class="o">.</span><span class="n">WindowInto</span><span 
class="p">(</span><span class="n">FixedWindows</span><span class="p">(</span>
+            <span class="n">size</span><span class="o">=</span><span 
class="bp">self</span><span class="o">.</span><span 
class="n">window_duration</span> <span class="o">*</span> <span 
class="mi">60</span><span class="p">))</span>
+        <span class="c"># Extract and sum teamname/score pairs from the event 
data.</span>
+        <span class="o">|</span> <span class="s">'ExtractTeamScore'</span> 
<span class="o">&gt;&gt;</span> <span class="n">ExtractAndSumScore</span><span 
class="p">(</span><span class="s">'team'</span><span class="p">))</span>
+
+
+<span class="k">def</span> <span class="nf">run</span><span 
class="p">(</span><span class="n">argv</span><span class="o">=</span><span 
class="bp">None</span><span class="p">):</span>
+  <span class="s">"""Main entry point; defines and runs the hourly_team_score 
pipeline."""</span>
+  <span class="o">...</span>
+
+  <span class="n">known_args</span><span class="p">,</span> <span 
class="n">pipeline_args</span> <span class="o">=</span> <span 
class="n">parser</span><span class="o">.</span><span 
class="n">parse_known_args</span><span class="p">(</span><span 
class="n">argv</span><span class="p">)</span>
+
+  <span class="n">pipeline_options</span> <span class="o">=</span> <span 
class="n">PipelineOptions</span><span class="p">(</span><span 
class="n">pipeline_args</span><span class="p">)</span>
+  <span class="n">p</span> <span class="o">=</span> <span 
class="n">beam</span><span class="o">.</span><span 
class="n">Pipeline</span><span class="p">(</span><span 
class="n">options</span><span class="o">=</span><span 
class="n">pipeline_options</span><span class="p">)</span>
+  <span class="n">pipeline_options</span><span class="o">.</span><span 
class="n">view_as</span><span class="p">(</span><span 
class="n">SetupOptions</span><span class="p">)</span><span 
class="o">.</span><span class="n">save_main_session</span> <span 
class="o">=</span> <span class="bp">True</span>
+
+  <span class="p">(</span><span class="n">p</span>  <span class="c"># pylint: 
disable=expression-not-assigned</span>
+   <span class="o">|</span> <span class="n">ReadFromText</span><span 
class="p">(</span><span class="n">known_args</span><span 
class="o">.</span><span class="nb">input</span><span class="p">)</span>
+   <span class="o">|</span> <span class="n">HourlyTeamScore</span><span 
class="p">(</span>
+       <span class="n">known_args</span><span class="o">.</span><span 
class="n">start_min</span><span class="p">,</span> <span 
class="n">known_args</span><span class="o">.</span><span 
class="n">stop_min</span><span class="p">,</span> <span 
class="n">known_args</span><span class="o">.</span><span 
class="n">window_duration</span><span class="p">)</span>
+   <span class="o">|</span> <span 
class="n">WriteWindowedToBigQuery</span><span class="p">(</span>
+       <span class="n">known_args</span><span class="o">.</span><span 
class="n">table_name</span><span class="p">,</span> <span 
class="n">known_args</span><span class="o">.</span><span 
class="n">dataset</span><span class="p">,</span> <span 
class="n">configure_bigquery_write</span><span class="p">()))</span>
+
+  <span class="n">result</span> <span class="o">=</span> <span 
class="n">p</span><span class="o">.</span><span class="n">run</span><span 
class="p">()</span>
+  <span class="n">result</span><span class="o">.</span><span 
class="n">wait_until_finish</span><span class="p">()</span>
+</code></pre>
+</div>
+
 <h3 id="limitations-1">Limitations</h3>
 
 <p>As written, <code class="highlighter-rouge">HourlyTeamScore</code> still 
has a limitation:</p>
@@ -474,6 +619,10 @@
 
 <h2 
id="leaderboard-streaming-processing-with-real-time-game-data">LeaderBoard: 
Streaming Processing with Real-Time Game Data</h2>
 
+<blockquote>
+  <p><strong>Note:</strong> This example currently exists in Java only.</p>
+</blockquote>
+
 <p>One way we can help address the latency issue present in the <code 
class="highlighter-rouge">UserScore</code> and <code 
class="highlighter-rouge">HourlyTeamScore</code> pipelines is by reading the 
score data from an unbounded source. The <code 
class="highlighter-rouge">LeaderBoard</code> pipeline introduces streaming 
processing by reading the game score data from an unbounded source that 
produces an infinite amount of data, rather than from a file on the game 
server.</p>
 
 <p>The <code class="highlighter-rouge">LeaderBoard</code> pipeline also 
demonstrates how to process game score data with respect to both <em>processing 
time</em> and <em>event time</em>. <code 
class="highlighter-rouge">LeaderBoard</code> outputs data about both individual 
user scores and about team scores, each with respect to a different time 
frame.</p>
@@ -607,6 +756,10 @@
 
 <h2 id="gamestats-abuse-detection-and-usage-analysis">GameStats: Abuse 
Detection and Usage Analysis</h2>
 
+<blockquote>
+  <p><strong>Note:</strong> This example currently exists in Java only.</p>
+</blockquote>
+
 <p>While <code class="highlighter-rouge">LeaderBoard</code> demonstrates how 
to use basic windowing and triggers to perform low-latency and flexible data 
analysis, we can use more advanced windowing techniques to perform more 
comprehensive analysis. This might include some calculations designed to detect 
system abuse (like spam) or to gain insight into user behavior. The <code 
class="highlighter-rouge">GameStats</code> pipeline builds on the low-latency 
functionality in <code class="highlighter-rouge">LeaderBoard</code> to 
demonstrate how you can use Beam to perform this kind of advanced analysis.</p>
 
 <p>Like <code class="highlighter-rouge">LeaderBoard</code>, <code 
class="highlighter-rouge">GameStats</code> reads data from an unbounded source. 
It is best thought of as an ongoing job that provides insight into the game as 
users play.</p>

Reply via email to