This is an automated email from the ASF dual-hosted git repository. mergebot-role pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/beam-site.git
commit 21477ff9ba5ffff06f84da4dc3e6155046098503 Author: Mergebot <[email protected]> AuthorDate: Tue Jan 30 09:59:27 2018 -0800 Prepare repository for deployment. --- .../2016/10/11/strata-hadoop-world-and-beam.html | 2 +- content/blog/2016/10/20/test-stream.html | 4 +- content/documentation/io/testing/index.html | 2 +- content/documentation/programming-guide/index.html | 7 +- content/feed.xml | 4 +- .../get-started/mobile-gaming-example/index.html | 193 +++++++++++---------- 6 files changed, 113 insertions(+), 99 deletions(-) diff --git a/content/beam/update/2016/10/11/strata-hadoop-world-and-beam.html b/content/beam/update/2016/10/11/strata-hadoop-world-and-beam.html index 9c939a8..f42292e 100644 --- a/content/beam/update/2016/10/11/strata-hadoop-world-and-beam.html +++ b/content/beam/update/2016/10/11/strata-hadoop-world-and-beam.html @@ -95,7 +95,7 @@ <p><img src="/images/blog/IMG_20160927_170956.jpg" alt="Exercise time" /></p> -<p>If you want to take a look at the tutorial materials, we’ve put them up <a href="https://github.com/eljefe6a/beamexample">on GitHub</a>. This includes the <a href="https://github.com/eljefe6a/beamexample/blob/master/BeamTutorial/slides.pdf">actual slides</a> as well as the <a href="https://github.com/eljefe6a/beamexample/tree/master/BeamTutorial/src/main/java/org/apache/beam/examples/tutorial/game">exercises</a> that we covered. If you’re looking to learn a little about Beam, this is [...] +<p>If you want to take a look at the tutorial materials, we’ve put them up <a href="https://github.com/eljefe6a/beamexample">on GitHub</a>. This includes the <a href="https://github.com/eljefe6a/beamexample/blob/master/BeamTutorial/slides.pdf">actual slides</a> as well as the <a href="https://github.com/eljefe6a/beamexample/tree/master/BeamTutorial/src/main/java/org/apache/beam/examples/tutorial/game">exercises</a> that we covered. If you’re looking to learn a little about Beam, this is [...] <p>I want to share some of takeaways I had about Beam during the conference.</p> diff --git a/content/blog/2016/10/20/test-stream.html b/content/blog/2016/10/20/test-stream.html index f98d239..f2cc8bb 100644 --- a/content/blog/2016/10/20/test-stream.html +++ b/content/blog/2016/10/20/test-stream.html @@ -128,8 +128,8 @@ from the Mobile Gaming example series.</p> <h2 id="leaderboard-and-the-mobile-gaming-example">LeaderBoard and the Mobile Gaming Example</h2> -<p><a href="https://github.com/apache/beam/blob/master/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java#L177">LeaderBoard</a> -is part of the <a href="https://github.com/apache/beam/tree/master/examples/java8/src/main/java/org/apache/beam/examples/complete/game">Beam mobile gaming examples</a> +<p><a href="https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java#L177">LeaderBoard</a> +is part of the <a href="https://github.com/apache/beam/tree/master/examples/java/src/main/java/org/apache/beam/examples/complete/game">Beam mobile gaming examples</a> (and <a href="/get-started/mobile-gaming-example/">walkthroughs</a>) which produces a continuous accounting of user and team scores. User scores are calculated over the lifetime of the program, while team scores are calculated diff --git a/content/documentation/io/testing/index.html b/content/documentation/io/testing/index.html index bb0b1b5..4a3eb11 100644 --- a/content/documentation/io/testing/index.html +++ b/content/documentation/io/testing/index.html @@ -832,7 +832,7 @@ dynamic_pipeline_options: <li>Having your test take a pipeline option that decides whether to generate a small or large amount of test data (where small and large are sizes appropriate to your data store)</li> </ol> -<p>An example of this is <a href="https://github.com/apache/beam/tree/master/sdks/java/io/hadoop/input-format">HadoopInputFormatIO</a>’s tests.</p> +<p>An example of this is <a href="https://github.com/apache/beam/tree/master/sdks/java/io/hadoop-input-format">HadoopInputFormatIO</a>’s tests.</p> <!-- # Next steps diff --git a/content/documentation/programming-guide/index.html b/content/documentation/programming-guide/index.html index 1794b85..6db8a56 100644 --- a/content/documentation/programming-guide/index.html +++ b/content/documentation/programming-guide/index.html @@ -1391,10 +1391,9 @@ followed by a <code class="highlighter-rouge">ParDo</code> to consume the result and format data from each collection.</p> <div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="n">PCollection</span><span class="o"><</span><span class="n">KV</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">CoGbkResult</span><span class="o">>></span> <span class="n">results</span> <span class="o">=</span> - <span class="n">KeyedPCollectionTuple</span> - <span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">emailsTag</span><span class="o">,</span> <span class="n">emails</span><span class="o">)</span> - <span class="o">.</span><span class="na">and</span><span class="o">(</span><span class="n">phonesTag</span><span class="o">,</span> <span class="n">phones</span><span class="o">)</span> - <span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="n">CoGroupByKey</span><span class="o">.<</span><span class="n">String</span><span class="o">></span><span class="n">create</span><span class="o">());</span> + <span class="n">KeyedPCollectionTuple</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">emailsTag</span><span class="o">,</span> <span class="n">emails</span><span class="o">)</span> + <span class="o">.</span><span class="na">and</span><span class="o">(</span><span class="n">phonesTag</span><span class="o">,</span> <span class="n">phones</span><span class="o">)</span> + <span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="n">CoGroupByKey</span><span class="o">.</span><span class="na">create</span><span class="o">());</span> <span class="n">PCollection</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">contactLines</span> <span class="o">=</span> <span class="n">results</span><span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="n">ParDo</span><span class="o">.</span><span class="na">of</span><span class="o">(</span> <span class="k">new</span> <span class="n">DoFn</span><span class="o"><</span><span class="n">KV</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">CoGbkResult</span><span class="o">>,</span> <span class="n">String</span><span class="o">>()</span> <span class="o">{</span> diff --git a/content/feed.xml b/content/feed.xml index 8e0966a..2c48dfa 100644 --- a/content/feed.xml +++ b/content/feed.xml @@ -2192,8 +2192,8 @@ from the Mobile Gaming example series.</p> <h2 id="leaderboard-and-the-mobile-gaming-example">LeaderBoard and the Mobile Gaming Example</h2> -<p><a href="https://github.com/apache/beam/blob/master/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java#L177">LeaderBoard</a> -is part of the <a href="https://github.com/apache/beam/tree/master/examples/java8/src/main/java/org/apache/beam/examples/complete/game">Beam mobile gaming examples</a> +<p><a href="https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java#L177">LeaderBoard</a> +is part of the <a href="https://github.com/apache/beam/tree/master/examples/java/src/main/java/org/apache/beam/examples/complete/game">Beam mobile gaming examples</a> (and <a href="/get-started/mobile-gaming-example/">walkthroughs</a>) which produces a continuous accounting of user and team scores. User scores are calculated over the lifetime of the program, while team scores are calculated diff --git a/content/get-started/mobile-gaming-example/index.html b/content/get-started/mobile-gaming-example/index.html index 91c260f..9714bbb 100644 --- a/content/get-started/mobile-gaming-example/index.html +++ b/content/get-started/mobile-gaming-example/index.html @@ -230,7 +230,7 @@ looks more like what is depicted by the red squiggly line above the ideal line.< <p>The <code class="highlighter-rouge">UserScore</code> pipeline is the simplest example for processing mobile game data. <code class="highlighter-rouge">UserScore</code> determines the total score per user over a finite data set (for example, one day’s worth of scores stored on the game server). Pipelines like <code class="highlighter-rouge">UserScore</code> are best run periodically after all relevant data has been gathered. For example, <code class="highlighter-rouge">UserScore</code> [...] <blockquote class="language-java"> - <p><strong>Note:</strong> See <a href="https://github.com/apache/beam/blob/master/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java">UserScore on GitHub</a> for the complete example pipeline program.</p> + <p><strong>Note:</strong> See <a href="https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/complete/game/UserScore.java">UserScore on GitHub</a> for the complete example pipeline program.</p> </blockquote> <blockquote class="language-py"> @@ -281,10 +281,11 @@ looks more like what is depicted by the red squiggly line above the ideal line.< <span class="n">PCollection</span><span class="o"><</span><span class="n">GameActionInfo</span><span class="o">></span> <span class="n">gameInfo</span><span class="o">)</span> <span class="o">{</span> <span class="k">return</span> <span class="n">gameInfo</span> - <span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="n">MapElements</span> - <span class="o">.</span><span class="na">into</span><span class="o">(</span><span class="n">TypeDescriptors</span><span class="o">.</span><span class="na">kvs</span><span class="o">(</span><span class="n">TypeDescriptors</span><span class="o">.</span><span class="na">strings</span><span class="o">(),</span> <span class="n">TypeDescriptors</span><span class="o">.</span><span class="na">integers</span><span class="o">()))</span> - <span class="o">.</span><span class="na">via</span><span class="o">((</span><span class="n">GameActionInfo</span> <span class="n">gInfo</span><span class="o">)</span> <span class="o">-></span> <span class="n">KV</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">gInfo</span><span class="o">.</span><span class="na">getKey</span><span class="o">(</span><span class="n">field</span><span class="o">),</span> <span class="n">gInfo</span [...] - <span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="n">Sum</span><span class="o">.<</span><span class="n">String</span><span class="o">></span><span class="n">integersPerKey</span><span class="o">());</span> + <span class="o">.</span><span class="na">apply</span><span class="o">(</span> + <span class="n">MapElements</span><span class="o">.</span><span class="na">into</span><span class="o">(</span> + <span class="n">TypeDescriptors</span><span class="o">.</span><span class="na">kvs</span><span class="o">(</span><span class="n">TypeDescriptors</span><span class="o">.</span><span class="na">strings</span><span class="o">(),</span> <span class="n">TypeDescriptors</span><span class="o">.</span><span class="na">integers</span><span class="o">()))</span> + <span class="o">.</span><span class="na">via</span><span class="o">((</span><span class="n">GameActionInfo</span> <span class="n">gInfo</span><span class="o">)</span> <span class="o">-></span> <span class="n">KV</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">gInfo</span><span class="o">.</span><span class="na">getKey</span><span class="o">(</span><span class="n">field</span><span class="o">),</span> <span class="n">gInfo [...] + <span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="n">Sum</span><span class="o">.</span><span class="na">integersPerKey</span><span class="o">());</span> <span class="o">}</span> <span class="o">}</span> </code></pre> @@ -321,11 +322,7 @@ looks more like what is depicted by the red squiggly line above the ideal line.< <span class="c1">// Extract and sum username/score pairs from the event data.</span> <span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="s">"ExtractUserScore"</span><span class="o">,</span> <span class="k">new</span> <span class="n">ExtractAndSumScore</span><span class="o">(</span><span class="s">"user"</span><span class="o">))</span> <span class="o">.</span><span class="na">apply</span><span class="o">(</span> - <span class="s">"WriteUserScoreSums"</span><span class="o">,</span> - <span class="k">new</span> <span class="n">WriteToText</span><span class="o"><</span><span class="n">KV</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">>>(</span> - <span class="n">options</span><span class="o">.</span><span class="na">getOutput</span><span class="o">(),</span> - <span class="n">configureOutput</span><span class="o">(),</span> - <span class="kc">false</span><span class="o">));</span> + <span class="s">"WriteUserScoreSums"</span><span class="o">,</span> <span class="k">new</span> <span class="n">WriteToText</span><span class="o"><>(</span><span class="n">options</span><span class="o">.</span><span class="na">getOutput</span><span class="o">(),</span> <span class="n">configureOutput</span><span class="o">(),</span> <span class="kc">false</span><span class="o">));</span> <span class="c1">// Run the batch pipeline.</span> <span class="n">pipeline</span><span class="o">.</span><span class="na">run</span><span class="o">().</span><span class="na">waitUntilFinish</span><span class="o">();</span> @@ -396,7 +393,7 @@ looks more like what is depicted by the red squiggly line above the ideal line.< <p>Like <code class="highlighter-rouge">UserScore</code>, <code class="highlighter-rouge">HourlyTeamScore</code> is best thought of as a job to be run periodically after all the relevant data has been gathered (such as once per day). The pipeline reads a fixed data set from a file, and writes the results to a Google Cloud BigQuery table.</p> <blockquote class="language-java"> - <p><strong>Note:</strong> See <a href="https://github.com/apache/beam/blob/master/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java">HourlyTeamScore on GitHub</a> for the complete example pipeline program.</p> + <p><strong>Note:</strong> See <a href="https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java">HourlyTeamScore on GitHub</a> for the complete example pipeline program.</p> </blockquote> <blockquote class="language-py"> @@ -444,10 +441,12 @@ logical windows based on when those scores occurred in event time.</em></p> <p>The following code shows this:</p> <div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="c1">// Add an element timestamp based on the event log, and apply fixed windowing.</span> -<span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="s">"AddEventTimestamps"</span><span class="o">,</span> - <span class="n">WithTimestamps</span><span class="o">.</span><span class="na">of</span><span class="o">((</span><span class="n">GameActionInfo</span> <span class="n">i</span><span class="o">)</span> <span class="o">-></span> <span class="k">new</span> <span class="n">Instant</span><span class="o">(</span><span class="n">i</span><span class="o">.</span><span class="na">getTimestamp</span><span class="o">())))</span> -<span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="s">"FixedWindowsTeam"</span><span class="o">,</span> <span class="n">Window</span><span class="o">.<</span><span class="n">GameActionInfo</span><span class="o">></span><span class="n">into</span><span class="o">(</span> - <span class="n">FixedWindows</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">Duration</span><span class="o">.</span><span class="na">standardMinutes</span><span class="o">(</span><span class="n">options</span><span class="o">.</span><span class="na">getWindowDuration</span><span class="o">()))))</span> +<span class="o">.</span><span class="na">apply</span><span class="o">(</span> + <span class="s">"AddEventTimestamps"</span><span class="o">,</span> + <span class="n">WithTimestamps</span><span class="o">.</span><span class="na">of</span><span class="o">((</span><span class="n">GameActionInfo</span> <span class="n">i</span><span class="o">)</span> <span class="o">-></span> <span class="k">new</span> <span class="n">Instant</span><span class="o">(</span><span class="n">i</span><span class="o">.</span><span class="na">getTimestamp</span><span class="o">())))</span> +<span class="o">.</span><span class="na">apply</span><span class="o">(</span> + <span class="s">"FixedWindowsTeam"</span><span class="o">,</span> + <span class="n">Window</span><span class="o">.</span><span class="na">into</span><span class="o">(</span><span class="n">FixedWindows</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">Duration</span><span class="o">.</span><span class="na">standardMinutes</span><span class="o">(</span><span class="n">options</span><span class="o">.</span><span class="na">getWindowDuration</span><span class="o">()))))</span> </code></pre> </div> <div class="language-py highlighter-rouge"><pre class="highlight"><code><span class="c"># Add an element timestamp based on the event log, and apply fixed</span> @@ -471,12 +470,14 @@ logical windows based on when those scores occurred in event time.</em></p> <p>The following code shows how <code class="highlighter-rouge">HourlyTeamScore</code> uses the <code class="highlighter-rouge">Filter</code> transform to filter events that occur either before or after the relevant analysis period:</p> -<div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="s">"FilterStartTime"</span><span class="o">,</span> <span class="n">Filter</span><span class="o">.</span><span class="na">by</span><span class="o">(</span> - <span class="o">(</span><span class="n">GameActionInfo</span> <span class="n">gInfo</span><span class="o">)</span> - <span class="o">-></span> <span class="n">gInfo</span><span class="o">.</span><span class="na">getTimestamp</span><span class="o">()</span> <span class="o">></span> <span class="n">startMinTimestamp</span><span class="o">.</span><span class="na">getMillis</span><span class="o">()))</span> -<span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="s">"FilterEndTime"</span><span class="o">,</span> <span class="n">Filter</span><span class="o">.</span><span class="na">by</span><span class="o">(</span> - <span class="o">(</span><span class="n">GameActionInfo</span> <span class="n">gInfo</span><span class="o">)</span> - <span class="o">-></span> <span class="n">gInfo</span><span class="o">.</span><span class="na">getTimestamp</span><span class="o">()</span> <span class="o"><</span> <span class="n">stopMinTimestamp</span><span class="o">.</span><span class="na">getMillis</span><span class="o">()))</span> +<div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="o">.</span><span class="na">apply</span><span class="o">(</span> + <span class="s">"FilterStartTime"</span><span class="o">,</span> + <span class="n">Filter</span><span class="o">.</span><span class="na">by</span><span class="o">(</span> + <span class="o">(</span><span class="n">GameActionInfo</span> <span class="n">gInfo</span><span class="o">)</span> <span class="o">-></span> <span class="n">gInfo</span><span class="o">.</span><span class="na">getTimestamp</span><span class="o">()</span> <span class="o">></span> <span class="n">startMinTimestamp</span><span class="o">.</span><span class="na">getMillis</span><span class="o">()))</span> +<span class="o">.</span><span class="na">apply</span><span class="o">(</span> + <span class="s">"FilterEndTime"</span><span class="o">,</span> + <span class="n">Filter</span><span class="o">.</span><span class="na">by</span><span class="o">(</span> + <span class="o">(</span><span class="n">GameActionInfo</span> <span class="n">gInfo</span><span class="o">)</span> <span class="o">-></span> <span class="n">gInfo</span><span class="o">.</span><span class="na">getTimestamp</span><span class="o">()</span> <span class="o"><</span> <span class="n">stopMinTimestamp</span><span class="o">.</span><span class="na">getMillis</span><span class="o">()))</span> </code></pre> </div> <div class="language-py highlighter-rouge"><pre class="highlight"><code><span class="o">|</span> <span class="s">'FilterStartTime'</span> <span class="o">>></span> <span class="n">beam</span><span class="o">.</span><span class="n">Filter</span><span class="p">(</span> @@ -499,40 +500,43 @@ logical windows based on when those scores occurred in event time.</em></p> <span class="kd">final</span> <span class="n">Instant</span> <span class="n">startMinTimestamp</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Instant</span><span class="o">(</span><span class="n">minFmt</span><span class="o">.</span><span class="na">parseMillis</span><span class="o">(</span><span class="n">options</span><span class="o">.</span><span class="na">getStartMin</span><span class="o">()));</span> <span class="c1">// Read 'gaming' events from a text file.</span> - <span class="n">pipeline</span><span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="n">TextIO</span><span class="o">.</span><span class="na">read</span><span class="o">().</span><span class="na">from</span><span class="o">(</span><span class="n">options</span><span class="o">.</span><span class="na">getInput</span><span class="o">()))</span> - <span class="c1">// Parse the incoming data.</span> - <span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="s">"ParseGameEvent"</span><span class="o">,</span> <span class="n">ParDo</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="k">new</span> <span class="n">ParseEventFn</span><span class="o">()))</span> - - <span class="c1">// Filter out data before and after the given times so that it is not included</span> - <span class="c1">// in the calculations. As we collect data in batches (say, by day), the batch for the day</span> - <span class="c1">// that we want to analyze could potentially include some late-arriving data from the previous</span> - <span class="c1">// day. If so, we want to weed it out. Similarly, if we include data from the following day</span> - <span class="c1">// (to scoop up late-arriving events from the day we're analyzing), we need to weed out events</span> - <span class="c1">// that fall after the time period we want to analyze.</span> - <span class="c1">// [START DocInclude_HTSFilters]</span> - <span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="s">"FilterStartTime"</span><span class="o">,</span> <span class="n">Filter</span><span class="o">.</span><span class="na">by</span><span class="o">(</span> - <span class="o">(</span><span class="n">GameActionInfo</span> <span class="n">gInfo</span><span class="o">)</span> - <span class="o">-></span> <span class="n">gInfo</span><span class="o">.</span><span class="na">getTimestamp</span><span class="o">()</span> <span class="o">></span> <span class="n">startMinTimestamp</span><span class="o">.</span><span class="na">getMillis</span><span class="o">()))</span> - <span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="s">"FilterEndTime"</span><span class="o">,</span> <span class="n">Filter</span><span class="o">.</span><span class="na">by</span><span class="o">(</span> - <span class="o">(</span><span class="n">GameActionInfo</span> <span class="n">gInfo</span><span class="o">)</span> - <span class="o">-></span> <span class="n">gInfo</span><span class="o">.</span><span class="na">getTimestamp</span><span class="o">()</span> <span class="o"><</span> <span class="n">stopMinTimestamp</span><span class="o">.</span><span class="na">getMillis</span><span class="o">()))</span> - <span class="c1">// [END DocInclude_HTSFilters]</span> - - <span class="c1">// [START DocInclude_HTSAddTsAndWindow]</span> - <span class="c1">// Add an element timestamp based on the event log, and apply fixed windowing.</span> - <span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="s">"AddEventTimestamps"</span><span class="o">,</span> - <span class="n">WithTimestamps</span><span class="o">.</span><span class="na">of</span><span class="o">((</span><span class="n">GameActionInfo</span> <span class="n">i</span><span class="o">)</span> <span class="o">-></span> <span class="k">new</span> <span class="n">Instant</span><span class="o">(</span><span class="n">i</span><span class="o">.</span><span class="na">getTimestamp</span><span class="o">())))</span> - <span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="s">"FixedWindowsTeam"</span><span class="o">,</span> <span class="n">Window</span><span class="o">.<</span><span class="n">GameActionInfo</span><span class="o">></span><span class="n">into</span><span class="o">(</span> - <span class="n">FixedWindows</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">Duration</span><span class="o">.</span><span class="na">standardMinutes</span><span class="o">(</span><span class="n">options</span><span class="o">.</span><span class="na">getWindowDuration</span><span class="o">()))))</span> - <span class="c1">// [END DocInclude_HTSAddTsAndWindow]</span> + <span class="n">pipeline</span> + <span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="n">TextIO</span><span class="o">.</span><span class="na">read</span><span class="o">().</span><span class="na">from</span><span class="o">(</span><span class="n">options</span><span class="o">.</span><span class="na">getInput</span><span class="o">()))</span> + <span class="c1">// Parse the incoming data.</span> + <span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="s">"ParseGameEvent"</span><span class="o">,</span> <span class="n">ParDo</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="k">new</span> <span class="n">ParseEventFn</span><span class="o">()))</span> - <span class="c1">// Extract and sum teamname/score pairs from the event data.</span> - <span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="s">"ExtractTeamScore"</span><span class="o">,</span> <span class="k">new</span> <span class="n">ExtractAndSumScore</span><span class="o">(</span><span class="s">"team"</span><span class="o">))</span> - <span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="s">"WriteTeamScoreSums"</span><span class="o">,</span> - <span class="k">new</span> <span class="n">WriteToText</span><span class="o"><</span><span class="n">KV</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">>>(</span> - <span class="n">options</span><span class="o">.</span><span class="na">getOutput</span><span class="o">(),</span> - <span class="n">configureOutput</span><span class="o">(),</span> - <span class="kc">true</span><span class="o">));</span> + <span class="c1">// Filter out data before and after the given times so that it is not included</span> + <span class="c1">// in the calculations. As we collect data in batches (say, by day), the batch for the day</span> + <span class="c1">// that we want to analyze could potentially include some late-arriving data from the</span> + <span class="c1">// previous day.</span> + <span class="c1">// If so, we want to weed it out. Similarly, if we include data from the following day</span> + <span class="c1">// (to scoop up late-arriving events from the day we're analyzing), we need to weed out</span> + <span class="c1">// events that fall after the time period we want to analyze.</span> + <span class="c1">// [START DocInclude_HTSFilters]</span> + <span class="o">.</span><span class="na">apply</span><span class="o">(</span> + <span class="s">"FilterStartTime"</span><span class="o">,</span> + <span class="n">Filter</span><span class="o">.</span><span class="na">by</span><span class="o">(</span> + <span class="o">(</span><span class="n">GameActionInfo</span> <span class="n">gInfo</span><span class="o">)</span> <span class="o">-></span> <span class="n">gInfo</span><span class="o">.</span><span class="na">getTimestamp</span><span class="o">()</span> <span class="o">></span> <span class="n">startMinTimestamp</span><span class="o">.</span><span class="na">getMillis</span><span class="o">()))</span> + <span class="o">.</span><span class="na">apply</span><span class="o">(</span> + <span class="s">"FilterEndTime"</span><span class="o">,</span> + <span class="n">Filter</span><span class="o">.</span><span class="na">by</span><span class="o">(</span> + <span class="o">(</span><span class="n">GameActionInfo</span> <span class="n">gInfo</span><span class="o">)</span> <span class="o">-></span> <span class="n">gInfo</span><span class="o">.</span><span class="na">getTimestamp</span><span class="o">()</span> <span class="o"><</span> <span class="n">stopMinTimestamp</span><span class="o">.</span><span class="na">getMillis</span><span class="o">()))</span> + <span class="c1">// [END DocInclude_HTSFilters]</span> + + <span class="c1">// [START DocInclude_HTSAddTsAndWindow]</span> + <span class="c1">// Add an element timestamp based on the event log, and apply fixed windowing.</span> + <span class="o">.</span><span class="na">apply</span><span class="o">(</span> + <span class="s">"AddEventTimestamps"</span><span class="o">,</span> + <span class="n">WithTimestamps</span><span class="o">.</span><span class="na">of</span><span class="o">((</span><span class="n">GameActionInfo</span> <span class="n">i</span><span class="o">)</span> <span class="o">-></span> <span class="k">new</span> <span class="n">Instant</span><span class="o">(</span><span class="n">i</span><span class="o">.</span><span class="na">getTimestamp</span><span class="o">())))</span> + <span class="o">.</span><span class="na">apply</span><span class="o">(</span> + <span class="s">"FixedWindowsTeam"</span><span class="o">,</span> + <span class="n">Window</span><span class="o">.</span><span class="na">into</span><span class="o">(</span><span class="n">FixedWindows</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">Duration</span><span class="o">.</span><span class="na">standardMinutes</span><span class="o">(</span><span class="n">options</span><span class="o">.</span><span class="na">getWindowDuration</span><span class="o">()))))</span> + <span class="c1">// [END DocInclude_HTSAddTsAndWindow]</span> + + <span class="c1">// Extract and sum teamname/score pairs from the event data.</span> + <span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="s">"ExtractTeamScore"</span><span class="o">,</span> <span class="k">new</span> <span class="n">ExtractAndSumScore</span><span class="o">(</span><span class="s">"team"</span><span class="o">))</span> + <span class="o">.</span><span class="na">apply</span><span class="o">(</span> + <span class="s">"WriteTeamScoreSums"</span><span class="o">,</span> <span class="k">new</span> <span class="n">WriteToText</span><span class="o"><>(</span><span class="n">options</span><span class="o">.</span><span class="na">getOutput</span><span class="o">(),</span> <span class="n">configureOutput</span><span class="o">(),</span> <span class="kc">true</span><span class="o">));</span> <span class="n">pipeline</span><span class="o">.</span><span class="na">run</span><span class="o">().</span><span class="na">waitUntilFinish</span><span class="o">();</span> <span class="o">}</span> @@ -663,7 +667,7 @@ logical windows based on when those scores occurred in event time.</em></p> <p>Because the <code class="highlighter-rouge">LeaderBoard</code> pipeline reads the game data from an unbounded source as that data is generated, you can think of the pipeline as an ongoing job running concurrently with the game process. <code class="highlighter-rouge">LeaderBoard</code> can thus provide low-latency insights into how users are playing the game at any given moment — useful if, for example, we want to provide a live web-based scoreboard so that users can track their progr [...] <blockquote class="language-java"> - <p><strong>Note:</strong> See <a href="https://github.com/apache/beam/blob/master/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java">LeaderBoard on GitHub</a> for the complete example pipeline program.</p> + <p><strong>Note:</strong> See <a href="https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java">LeaderBoard on GitHub</a> for the complete example pipeline program.</p> </blockquote> <blockquote class="language-py"> @@ -852,7 +856,7 @@ late results.</em></p> <p>Like <code class="highlighter-rouge">LeaderBoard</code>, <code class="highlighter-rouge">GameStats</code> reads data from an unbounded source. It is best thought of as an ongoing job that provides insight into the game as users play.</p> <blockquote class="language-java"> - <p><strong>Note:</strong> See <a href="https://github.com/apache/beam/blob/master/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java">GameStats on GitHub</a> for the complete example pipeline program.</p> + <p><strong>Note:</strong> See <a href="https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/complete/game/GameStats.java">GameStats on GitHub</a> for the complete example pipeline program.</p> </blockquote> <blockquote class="language-py"> @@ -889,12 +893,12 @@ late results.</em></p> <span class="kd">public</span> <span class="n">PCollection</span><span class="o"><</span><span class="n">KV</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">>></span> <span class="nf">expand</span><span class="o">(</span><span class="n">PCollection</span><span class="o"><</span><span class="n">KV</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span cla [...] <span class="c1">// Get the sum of scores for each user.</span> - <span class="n">PCollection</span><span class="o"><</span><span class="n">KV</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">>></span> <span class="n">sumScores</span> <span class="o">=</span> <span class="n">userScores</span> - <span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="s">"UserSum"</span><span class="o">,</span> <span class="n">Sum</span><span class="o">.<</span><span class="n">String</span><span class="o">></span><span class="n">integersPerKey</span><span class="o">());</span> + <span class="n">PCollection</span><span class="o"><</span><span class="n">KV</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">>></span> <span class="n">sumScores</span> <span class="o">=</span> + <span class="n">userScores</span><span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="s">"UserSum"</span><span class="o">,</span> <span class="n">Sum</span><span class="o">.</span><span class="na">integersPerKey</span><span class="o">());</span> <span class="c1">// Extract the score from each element, and use it to find the global mean.</span> - <span class="kd">final</span> <span class="n">PCollectionView</span><span class="o"><</span><span class="n">Double</span><span class="o">></span> <span class="n">globalMeanScore</span> <span class="o">=</span> <span class="n">sumScores</span><span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="n">Values</span><span class="o">.<</span><span class="n">Integer</span><span class="o">></span><span class="n">create</span><span class="o">())</span> - <span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="n">Mean</span><span class="o">.<</span><span class="n">Integer</span><span class="o">></span><span class="n">globally</span><span class="o">().</span><span class="na">asSingletonView</span><span class="o">());</span> + <span class="kd">final</span> <span class="n">PCollectionView</span><span class="o"><</span><span class="n">Double</span><span class="o">></span> <span class="n">globalMeanScore</span> <span class="o">=</span> + <span class="n">sumScores</span><span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="n">Values</span><span class="o">.</span><span class="na">create</span><span class="o">()).</span><span class="na">apply</span><span class="o">(</span><span class="n">Mean</span><span class="o">.<</span><span class="n">Integer</span><span class="o">></span><span class="n">globally</span><span class="o">().</span><span class="na">asSingletonView</span><spa [...] <span class="c1">// Filter the user sums using the global mean.</span> <span class="n">PCollection</span><span class="o"><</span><span class="n">KV</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">>></span> <span class="n">filtered</span> <span class="o">=</span> <span class="n">sumScores</span> @@ -961,21 +965,26 @@ late results.</em></p> <span class="c1">// suspected robots-- to filter out scores from those users from the sum.</span> <span class="c1">// Write the results to BigQuery.</span> <span class="n">rawEvents</span> - <span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="s">"WindowIntoFixedWindows"</span><span class="o">,</span> <span class="n">Window</span><span class="o">.<</span><span class="n">GameActionInfo</span><span class="o">></span><span class="n">into</span><span class="o">(</span> - <span class="n">FixedWindows</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">Duration</span><span class="o">.</span><span class="na">standardMinutes</span><span class="o">(</span><span class="n">options</span><span class="o">.</span><span class="na">getFixedWindowDuration</span><span class="o">()))))</span> - <span class="c1">// Filter out the detected spammer users, using the side input derived above.</span> - <span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="s">"FilterOutSpammers"</span><span class="o">,</span> <span class="n">ParDo</span> - <span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="k">new</span> <span class="n">DoFn</span><span class="o"><</span><span class="n">GameActionInfo</span><span class="o">,</span> <span class="n">GameActionInfo</span><span class="o">>()</span> <span class="o">{</span> - <span class="nd">@ProcessElement</span> - <span class="kd">public</span> <span class="kt">void</span> <span class="nf">processElement</span><span class="o">(</span><span class="n">ProcessContext</span> <span class="n">c</span><span class="o">)</span> <span class="o">{</span> - <span class="c1">// If the user is not in the spammers Map, output the data element.</span> - <span class="k">if</span> <span class="o">(</span><span class="n">c</span><span class="o">.</span><span class="na">sideInput</span><span class="o">(</span><span class="n">spammersView</span><span class="o">).</span><span class="na">get</span><span class="o">(</span><span class="n">c</span><span class="o">.</span><span class="na">element</span><span class="o">().</span><span class="na">getUser</span><span class="o">().</span><span class="na">trim</span><span class="o">())</s [...] - <span class="n">c</span><span class="o">.</span><span class="na">output</span><span class="o">(</span><span class="n">c</span><span class="o">.</span><span class="na">element</span><span class="o">());</span> - <span class="o">}</span> - <span class="o">}</span> - <span class="o">}).</span><span class="na">withSideInputs</span><span class="o">(</span><span class="n">spammersView</span><span class="o">))</span> + <span class="o">.</span><span class="na">apply</span><span class="o">(</span> + <span class="s">"WindowIntoFixedWindows"</span><span class="o">,</span> + <span class="n">Window</span><span class="o">.</span><span class="na">into</span><span class="o">(</span> + <span class="n">FixedWindows</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">Duration</span><span class="o">.</span><span class="na">standardMinutes</span><span class="o">(</span><span class="n">options</span><span class="o">.</span><span class="na">getFixedWindowDuration</span><span class="o">()))))</span> + <span class="c1">// Filter out the detected spammer users, using the side input derived above.</span> + <span class="o">.</span><span class="na">apply</span><span class="o">(</span> + <span class="s">"FilterOutSpammers"</span><span class="o">,</span> + <span class="n">ParDo</span><span class="o">.</span><span class="na">of</span><span class="o">(</span> + <span class="k">new</span> <span class="n">DoFn</span><span class="o"><</span><span class="n">GameActionInfo</span><span class="o">,</span> <span class="n">GameActionInfo</span><span class="o">>()</span> <span class="o">{</span> + <span class="nd">@ProcessElement</span> + <span class="kd">public</span> <span class="kt">void</span> <span class="nf">processElement</span><span class="o">(</span><span class="n">ProcessContext</span> <span class="n">c</span><span class="o">)</span> <span class="o">{</span> + <span class="c1">// If the user is not in the spammers Map, output the data element.</span> + <span class="k">if</span> <span class="o">(</span><span class="n">c</span><span class="o">.</span><span class="na">sideInput</span><span class="o">(</span><span class="n">spammersView</span><span class="o">).</span><span class="na">get</span><span class="o">(</span><span class="n">c</span><span class="o">.</span><span class="na">element</span><span class="o">().</span><span class="na">getUser</span><span class="o">().</span><span class="na">trim</span><span class="o"> [...] + <span class="n">c</span><span class="o">.</span><span class="na">output</span><span class="o">(</span><span class="n">c</span><span class="o">.</span><span class="na">element</span><span class="o">());</span> + <span class="o">}</span> + <span class="o">}</span> + <span class="o">})</span> + <span class="o">.</span><span class="na">withSideInputs</span><span class="o">(</span><span class="n">spammersView</span><span class="o">))</span> <span class="c1">// Extract and sum teamname/score pairs from the event data.</span> - <span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="s">"ExtractTeamScore"</span><span class="o">,</span> <span class="k">new</span> <span class="n">ExtractAndSumScore</span><span class="o">(</span><span class="s">"team"</span><span class="o">))</span> + <span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="s">"ExtractTeamScore"</span><span class="o">,</span> <span class="k">new</span> <span class="n">ExtractAndSumScore</span><span class="o">(</span><span class="s">"team"</span><span class="o">))</span> </code></pre> </div> <div class="language-py highlighter-rouge"><pre class="highlight"><code><span class="c"># Calculate the total score per team over fixed windows, and emit cumulative</span> @@ -1016,14 +1025,16 @@ between instances are.</em></p> <span class="c1">// This information could help the game designers track the changing user engagement</span> <span class="c1">// as their set of games changes.</span> <span class="n">userEvents</span> - <span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="s">"WindowIntoSessions"</span><span class="o">,</span> <span class="n">Window</span><span class="o">.<</span><span class="n">KV</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">>></span><span class="n">into</span><span class="o">(</span> - <span class="n">Sessions</span><span class="o">.</span><span class="na">withGapDuration</span><span class="o">(</span><span class="n">Duration</span><span class="o">.</span><span class="na">standardMinutes</span><span class="o">(</span><span class="n">options</span><span class="o">.</span><span class="na">getSessionGap</span><span class="o">())))</span> - <span class="o">.</span><span class="na">withTimestampCombiner</span><span class="o">(</span><span class="n">TimestampCombiner</span><span class="o">.</span><span class="na">END_OF_WINDOW</span><span class="o">))</span> - <span class="c1">// For this use, we care only about the existence of the session, not any particular</span> - <span class="c1">// information aggregated over it, so the following is an efficient way to do that.</span> - <span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="n">Combine</span><span class="o">.</span><span class="na">perKey</span><span class="o">(</span><span class="n">x</span> <span class="o">-></span> <span class="mi">0</span><span class="o">))</span> - <span class="c1">// Get the duration per session.</span> - <span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="s">"UserSessionActivity"</span><span class="o">,</span> <span class="n">ParDo</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="k">new</span> <span class="n">UserSessionInfoFn</span><span class="o">()))</span> + <span class="o">.</span><span class="na">apply</span><span class="o">(</span> + <span class="s">"WindowIntoSessions"</span><span class="o">,</span> + <span class="n">Window</span><span class="o">.<</span><span class="n">KV</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">>></span><span class="n">into</span><span class="o">(</span> + <span class="n">Sessions</span><span class="o">.</span><span class="na">withGapDuration</span><span class="o">(</span><span class="n">Duration</span><span class="o">.</span><span class="na">standardMinutes</span><span class="o">(</span><span class="n">options</span><span class="o">.</span><span class="na">getSessionGap</span><span class="o">())))</span> + <span class="o">.</span><span class="na">withTimestampCombiner</span><span class="o">(</span><span class="n">TimestampCombiner</span><span class="o">.</span><span class="na">END_OF_WINDOW</span><span class="o">))</span> + <span class="c1">// For this use, we care only about the existence of the session, not any particular</span> + <span class="c1">// information aggregated over it, so the following is an efficient way to do that.</span> + <span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="n">Combine</span><span class="o">.</span><span class="na">perKey</span><span class="o">(</span><span class="n">x</span> <span class="o">-></span> <span class="mi">0</span><span class="o">))</span> + <span class="c1">// Get the duration per session.</span> + <span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="s">"UserSessionActivity"</span><span class="o">,</span> <span class="n">ParDo</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="k">new</span> <span class="n">UserSessionInfoFn</span><span class="o">()))</span> </code></pre> </div> <div class="language-py highlighter-rouge"><pre class="highlight"><code><span class="c"># Detect user sessions-- that is, a burst of activity separated by a gap</span> @@ -1048,16 +1059,20 @@ between instances are.</em></p> <p>This gives us a set of user sessions, each with an attached duration. We can then calculate the <em>average</em> session length by re-windowing the data into fixed time windows, and then calculating the average for all sessions that end in each hour:</p> <div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="c1">// Re-window to process groups of session sums according to when the sessions complete.</span> -<span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="s">"WindowToExtractSessionMean"</span><span class="o">,</span> <span class="n">Window</span><span class="o">.<</span><span class="n">Integer</span><span class="o">></span><span class="n">into</span><span class="o">(</span> - <span class="n">FixedWindows</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">Duration</span><span class="o">.</span><span class="na">standardMinutes</span><span class="o">(</span><span class="n">options</span><span class="o">.</span><span class="na">getUserActivityWindowDuration</span><span class="o">()))))</span> +<span class="o">.</span><span class="na">apply</span><span class="o">(</span> + <span class="s">"WindowToExtractSessionMean"</span><span class="o">,</span> + <span class="n">Window</span><span class="o">.</span><span class="na">into</span><span class="o">(</span> + <span class="n">FixedWindows</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">Duration</span><span class="o">.</span><span class="na">standardMinutes</span><span class="o">(</span><span class="n">options</span><span class="o">.</span><span class="na">getUserActivityWindowDuration</span><span class="o">()))))</span> <span class="c1">// Find the mean session duration in each window.</span> <span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="n">Mean</span><span class="o">.<</span><span class="n">Integer</span><span class="o">></span><span class="n">globally</span><span class="o">().</span><span class="na">withoutDefaults</span><span class="o">())</span> <span class="c1">// Write this info to a BigQuery table.</span> -<span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="s">"WriteAvgSessionLength"</span><span class="o">,</span> - <span class="k">new</span> <span class="n">WriteWindowedToBigQuery</span><span class="o"><</span><span class="n">Double</span><span class="o">>(</span> - <span class="n">options</span><span class="o">.</span><span class="na">as</span><span class="o">(</span><span class="n">GcpOptions</span><span class="o">.</span><span class="na">class</span><span class="o">).</span><span class="na">getProject</span><span class="o">(),</span> - <span class="n">options</span><span class="o">.</span><span class="na">getDataset</span><span class="o">(),</span> - <span class="n">options</span><span class="o">.</span><span class="na">getGameStatsTablePrefix</span><span class="o">()</span> <span class="o">+</span> <span class="s">"_sessions"</span><span class="o">,</span> <span class="n">configureSessionWindowWrite</span><span class="o">()));</span> +<span class="o">.</span><span class="na">apply</span><span class="o">(</span> + <span class="s">"WriteAvgSessionLength"</span><span class="o">,</span> + <span class="k">new</span> <span class="n">WriteWindowedToBigQuery</span><span class="o"><>(</span> + <span class="n">options</span><span class="o">.</span><span class="na">as</span><span class="o">(</span><span class="n">GcpOptions</span><span class="o">.</span><span class="na">class</span><span class="o">).</span><span class="na">getProject</span><span class="o">(),</span> + <span class="n">options</span><span class="o">.</span><span class="na">getDataset</span><span class="o">(),</span> + <span class="n">options</span><span class="o">.</span><span class="na">getGameStatsTablePrefix</span><span class="o">()</span> <span class="o">+</span> <span class="s">"_sessions"</span><span class="o">,</span> + <span class="n">configureSessionWindowWrite</span><span class="o">()));</span> </code></pre> </div> <div class="language-py highlighter-rouge"><pre class="highlight"><code><span class="c"># Re-window to process groups of session sums according to when the</span> -- To stop receiving notification emails like this one, please contact [email protected].
