This is an automated email from the ASF dual-hosted git repository.

mergebot-role pushed a commit to branch mergebot
in repository https://gitbox.apache.org/repos/asf/beam-site.git

commit 10c3bbc435f5b580b7300ecc87d7b22d4499cfdc
Author: melissa <[email protected]>
AuthorDate: Thu Aug 31 10:23:06 2017 -0700

    [BEAM-893] Update mobile gaming example
---
 src/get-started/mobile-gaming-example.md | 479 ++++++-------------------------
 1 file changed, 83 insertions(+), 396 deletions(-)

diff --git a/src/get-started/mobile-gaming-example.md 
b/src/get-started/mobile-gaming-example.md
index 9e59274..c8d7606 100644
--- a/src/get-started/mobile-gaming-example.md
+++ b/src/get-started/mobile-gaming-example.md
@@ -11,7 +11,7 @@ redirect_from: /use/mobile-gaming-example/
 {:toc}
 
 <nav class="language-switcher">
-  <strong>Adapt for:</strong> 
+  <strong>Adapt for:</strong>
   <ul>
     <li data-type="language-java">Java SDK</li>
     <li data-type="language-py">Python SDK</li>
@@ -21,7 +21,7 @@ redirect_from: /use/mobile-gaming-example/
 This section provides a walkthrough of a series of example Apache Beam 
pipelines that demonstrate more complex functionality than the basic 
[WordCount]({{ site.baseurl }}/get-started/wordcount-example) examples. The 
pipelines in this section process data from a hypothetical game that users play 
on their mobile phones. The pipelines demonstrate processing at increasing 
levels of complexity; the first pipeline, for example, shows how to run a batch 
analysis job to obtain relatively simple  [...]
 
 {:.language-java}
-> **Note**: These examples assume some familiarity with the Beam programming 
model. If you haven't already, we recommend familiarizing yourself with the 
programming model documentation and running a basic example pipeline before 
continuing. Note also that these examples use the Java 8 lambda syntax, and 
thus require Java 8. However, you can create pipelines with equivalent 
functionality using Java 7. 
+> **Note**: These examples assume some familiarity with the Beam programming 
model. If you haven't already, we recommend familiarizing yourself with the 
programming model documentation and running a basic example pipeline before 
continuing. Note also that these examples use the Java 8 lambda syntax, and 
thus require Java 8. However, you can create pipelines with equivalent 
functionality using Java 7.
 
 {:.language-py}
 > **Note**: These examples assume some familiarity with the Beam programming 
 > model. If you haven't already, we recommend familiarizing yourself with the 
 > programming model documentation and running a basic example pipeline before 
 > continuing.
@@ -33,16 +33,16 @@ Every time a user plays an instance of our hypothetical 
mobile game, they genera
 - A score value for that particular instance of play.
 - A timestamp that records when the particular instance of play happened--this 
is the event time for each game data event.
 
-When the user completes an instance of the game, their phone sends the data 
event to a game server, where the data is logged and stored in a file. 
Generally the data is sent to the game server immediately upon completion. 
However, sometimes delays happen in the network or users play the game 
"offline", when their phones are out of contact with the server (such as on an 
airplane, or outside network coverage area). When the user's phone comes back 
into contact with the game server, the pho [...]
+When the user completes an instance of the game, their phone sends the data 
event to a game server, where the data is logged and stored in a file. 
Generally the data is sent to the game server immediately upon completion. 
However, sometimes delays can happen in the network at various points. Another 
possible scenario involves users who play the game "offline", when their phones 
are out of contact with the server (such as on an airplane, or outside network 
coverage area). When the user's  [...]
 
-The following diagram shows the ideal situation vs reality. The X-axis 
represents event time: the actual time a game event occurred. The Y-axis 
represents processing time: the time at which a game event was processed. 
Ideally, events should be processed as they occur, depicted by the dotted line 
in the diagram. However, in reality that is not the case and reality looks more 
like what is depicted by the red squiggly line.
+The following diagram shows the ideal situation (events are processed as they 
occur) vs. reality (there is often a time delay before processing).
 
 <figure id="fig1">
     <img src="{{ site.baseurl }}/images/gaming-example-basic.png"
          width="264" height="260"
          alt="Score data for three users.">
 </figure>
-Figure 1: Ideally, events are processed when they occur, with no delays.
+**Figure 1:** The X-axis represents event time: the actual time a game event 
occurred. The Y-axis represents processing time: the time at which a game event 
was processed. Ideally, events should be processed as they occur, depicted by 
the dotted line in the diagram. However, in reality that is not the case and 
reality looks more like what is depicted by the red squiggly line.
 
 The data events might be received by the game server significantly later than 
users generate them. This time difference (called **skew**) can have processing 
implications for pipelines that make calculations that consider when each score 
was generated. Such pipelines might track scores generated during each hour of 
a day, for example, or they calculate the length of time that users are 
continuously playing the game—both of which depend on each data record's event 
time.
 
@@ -50,7 +50,7 @@ Because some of our example pipelines use data files (like 
logs from the game se
 
 For pipelines that read unbounded game data from an unbounded source, the data 
source sets the intrinsic [timestamp]({{ site.baseurl 
}}/documentation/programming-guide/#pctimestamps) for each PCollection element 
to the appropriate event time.
 
-The Mobile Game example pipelines vary in complexity, from simple batch 
analysis to more complex pipelines that can perform real-time analysis and 
abuse detection. This section walks you through each example and demonstrates 
how to use Beam features like windowing and triggers to expand your pipeline's 
capabilites.
+The Mobile Gaming example pipelines vary in complexity, from simple batch 
analysis to more complex pipelines that can perform real-time analysis and 
abuse detection. This section walks you through each example and demonstrates 
how to use Beam features like windowing and triggers to expand your pipeline's 
capabilites.
 
 ## UserScore: Basic Score Processing in Batch
 
@@ -74,9 +74,9 @@ As the pipeline processes each event, the event score gets 
added to the sum tota
 
 `UserScore`'s basic pipeline flow does the following:
 
-1. Read the day's score data from a file stored in a text file.
+1. Read the day's score data from a text file.
 2. Sum the score values for each unique user by grouping each game event by 
user ID and combining the score values to get the total score for that 
particular user.
-3. Write the result data to a [Google Cloud 
BigQuery](https://cloud.google.com/bigquery/) table.
+3. Write the result data to a text file.
 
 The following diagram shows score data for several users over the pipeline 
analysis period. In the diagram, each data point is an event that results in 
one user/score pair:
 
@@ -85,110 +85,29 @@ The following diagram shows score data for several users 
over the pipeline analy
          width="900" height="263"
          alt="Score data for three users.">
 </figure>
-Figure 2: Score data for three users.
+**Figure 2:** Score data for three users.
 
 This example uses batch processing, and the diagram's Y axis represents 
processing time: the pipeline processes events lower on the Y-axis first, and 
events higher up the axis later. The diagram's X axis represents the event time 
for each game event, as denoted by that event's timestamp. Note that the 
individual events in the diagram are not processed by the pipeline in the same 
order as they occurred (according to their timestamps).
 
 After reading the score events from the input file, the pipeline groups all of 
those user/score pairs together and sums the score values into one total value 
per unique user. `UserScore` encapsulates the core logic for that step as the 
[user-defined composite transform]({{ site.baseurl 
}}/documentation/programming-guide/#transforms-composite) `ExtractAndSumScore`:
 
 ```java
-public static class ExtractAndSumScore
-    extends PTransform<PCollection<GameActionInfo>, PCollection<KV<String, 
Integer>>> {
-
-  private final String field;
-
-  ExtractAndSumScore(String field) {
-    this.field = field;
-  }
-
-  @Override
-  public PCollection<KV<String, Integer>> expand(
-      PCollection<GameActionInfo> gameInfo) {
-
-    return gameInfo
-      .apply(MapElements
-          .into(
-              TypeDescriptors.kvs(TypeDescriptors.strings(), 
TypeDescriptors.integers()))
-          .via((GameActionInfo gInfo) -> KV.of(gInfo.getKey(field), 
gInfo.getScore())))
-      .apply(Sum.<String>integersPerKey());
-  }
-}
-```
-
+{% github_sample 
/apache/beam/blob/master/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
 tag:DocInclude_USExtractXform
+%}```
 ```py
-class ExtractAndSumScore(beam.PTransform):
-  """A transform to extract key/score information and sum the scores.
-  The constructor argument `field` determines whether 'team' or 'user' info is
-  extracted.
-  """
-  def __init__(self, field):
-    super(ExtractAndSumScore, self).__init__()
-    self.field = field
-
-  def expand(self, pcoll):
-    return (pcoll
-            | beam.Map(lambda info: (info[self.field], info['score']))
-            | beam.CombinePerKey(sum_ints))
-
-def configure_bigquery_write():
-  return [
-      ('user', 'STRING', lambda e: e[0]),
-      ('total_score', 'INTEGER', lambda e: e[1]),
-  ]
-```
+{% github_sample 
/apache/beam/blob/master/sdks/python/apache_beam/examples/complete/game/user_score.py
 tag:extract_and_sum_score
+%}```
 
 `ExtractAndSumScore` is written to be more general, in that you can pass in 
the field by which you want to group the data (in the case of our game, by 
unique user or unique team). This means we can re-use `ExtractAndSumScore` in 
other pipelines that group score data by team, for example.
 
 Here's the main method of `UserScore`, showing how we apply all three steps of 
the pipeline:
 
 ```java
-public static void main(String[] args) throws Exception {
-  // Begin constructing a pipeline configured by commandline flags.
-  Options options = 
PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
-  Pipeline pipeline = Pipeline.create(options);
-
-  // Read events from a text file and parse them.
-  pipeline.apply(TextIO.read().from(options.getInput()))
-    .apply("ParseGameEvent", ParDo.of(new ParseEventFn()))
-    // Extract and sum username/score pairs from the event data.
-    .apply("ExtractUserScore", new ExtractAndSumScore("user"))
-    .apply("WriteUserScoreSums",
-        new WriteToBigQuery<KV<String, Integer>>(options.getTableName(),
-                                                 configureBigQueryWrite()));
-
-  // Run the batch pipeline.
-  pipeline.run().waitUntilFinish();
-}
-```
-
+{% github_sample 
/apache/beam/blob/master/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
 tag:DocInclude_USMain
+%}```
 ```py
-def run(argv=None):
-  """Main entry point; defines and runs the user_score pipeline."""
-  
-  ...
-
-  pipeline_options = PipelineOptions(pipeline_args)
-  p = beam.Pipeline(options=pipeline_options)
-
-  (p  # pylint: disable=expression-not-assigned
-   | ReadFromText(known_args.input) # Read events from a file and parse them.
-   | UserScore()
-   | WriteToBigQuery(
-       known_args.table_name, known_args.dataset, configure_bigquery_write()))
-
-  result = p.run()
-  result.wait_until_finish()
-```
-
-### Working with the Results
-
-`UserScore` writes the data to a BigQuery table (called `user_score` by 
default). With the data in the BigQuery table, we might perform a further 
interactive analysis, such as querying for a list of the N top-scoring users 
for a given day.
-
-Let's suppose we want to interactively determine the top 10 highest-scoring 
users for a given day. In the BigQuery user interface, we can run the following 
query:
-
-```
-SELECT * FROM [MyGameProject:MyGameDataset.user_score] ORDER BY total_score 
DESC LIMIT 10
-```
+{% github_sample 
/apache/beam/blob/master/sdks/python/apache_beam/examples/complete/game/user_score.py
 tag:main
+%}```
 
 ### Limitations
 
@@ -208,7 +127,7 @@ Starting with the next pipeline example, we'll discuss how 
you can use Beam's fe
 
 The `HourlyTeamScore` pipeline expands on the basic batch analysis principles 
used in the `UserScore` pipeline and improves upon some of its limitations. 
`HourlyTeamScore` performs finer-grained analysis, both by using additional 
features in the Beam SDKs, and taking into account more aspects of the game 
data. For example, `HourlyTeamScore` can filter out data that isn't part of the 
relevant analysis period.
 
-Like `UserScore`, `HourlyTeamScore` is best thought of as a job to be run 
periodically after all the relevant data has been gathered (such as once per 
day). The pipeline reads a fixed data set from a file, and writes the results 
to a Google Cloud BigQuery table, just like `UserScore`.
+Like `UserScore`, `HourlyTeamScore` is best thought of as a job to be run 
periodically after all the relevant data has been gathered (such as once per 
day). The pipeline reads a fixed data set from a file, and writes the results 
to a Google Cloud BigQuery table.
 
 {:.language-java}
 > **Note:** See [HourlyTeamScore on 
 > GitHub](https://github.com/apache/beam/blob/master/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java)
 >  for the complete example pipeline program.
@@ -237,7 +156,7 @@ The following diagram shows how the pipeline processes a 
day's worth of a single
          width="900" height="390"
          alt="Score data for two teams.">
 </figure>
-Figure 3: Score data for two teams. Each team's scores are divided into 
logical windows based on when those scores occurred in event time.
+**Figure 3:** Score data for two teams. Each team's scores are divided into 
logical windows based on when those scores occurred in event time.
 
 Notice that as processing time advances, the sums are now _per window_; each 
window represents an hour of _event time_ during the day in which the scores 
occurred.
 
@@ -251,26 +170,14 @@ Beam's windowing feature uses the [intrinsic timestamp 
information]({{ site.base
 {:.language-py}
 `HourlyTeamScore` uses the `FixedWindows` transform, found in 
[window.py](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/window.py),
 to perform these operations.
 
-The following code shows this: 
+The following code shows this:
 
 ```java
-// Add an element timestamp based on the event log, and apply fixed windowing.
-    .apply("AddEventTimestamps",
-           WithTimestamps.of((GameActionInfo i) -> new 
Instant(i.getTimestamp())))
-    .apply("FixedWindowsTeam", Window.<GameActionInfo>into(
-        
FixedWindows.of(Duration.standardMinutes(options.getWindowDuration()))))
-```
-
+{% github_sample 
/apache/beam/blob/master/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java
 tag:DocInclude_HTSAddTsAndWindow
+%}```
 ```py
-# Add an element timestamp based on the event log, and apply fixed windowing.
-# Convert element['timestamp'] into seconds as expected by TimestampedValue.
-| 'AddEventTimestamps' >> beam.Map(
-    lambda element: TimestampedValue(
-        element, element['timestamp'] / 1000.0))
-# Convert window_duration into seconds as expected by FixedWindows.
-| 'FixedWindowsTeam' >> beam.WindowInto(FixedWindows(
-    size=self.window_duration * 60))
-```
+{% github_sample 
/apache/beam/blob/master/sdks/python/apache_beam/examples/complete/game/hourly_team_score.py
 tag:add_timestamp_and_window
+%}```
 
 Notice that the transforms the pipeline uses to specify the windowing are 
distinct from the actual data processing transforms (such as 
`ExtractAndSumScores`). This functionality provides you some flexibility in 
designing your Beam pipeline, in that you can run existing transforms over 
datasets with different windowing characteristics.
 
@@ -285,133 +192,22 @@ It also lets the pipeline include relevant **late 
data**—data events with vali
 The following code shows how `HourlyTeamScore` uses the `Filter` transform to 
filter events that occur either before or after the relevant analysis period:
 
 ```java
-.apply("FilterStartTime", Filter.by(
-    (GameActionInfo gInfo)
-        -> gInfo.getTimestamp() > startMinTimestamp.getMillis()))
-.apply("FilterEndTime", Filter.by(
-    (GameActionInfo gInfo)
-        -> gInfo.getTimestamp() < stopMinTimestamp.getMillis()))
-```
-
+{% github_sample 
/apache/beam/blob/master/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java
 tag:DocInclude_HTSFilters
+%}```
 ```py
-| 'FilterStartTime' >> beam.Filter(
-    lambda element: element['timestamp'] > start_min_filter)
-| 'FilterEndTime' >> beam.Filter(
-    lambda element: element['timestamp'] < end_min_filter)
-```
+{% github_sample 
/apache/beam/blob/master/sdks/python/apache_beam/examples/complete/game/hourly_team_score.py
 tag:filter_by_time_range
+%}```
 
 #### Calculating Score Per Team, Per Window
 
 `HourlyTeamScore` uses the same `ExtractAndSumScores` transform as the 
`UserScore` pipeline, but passes a different key (team, as opposed to user). 
Also, because the pipeline applies `ExtractAndSumScores` _after_ applying 
fixed-time 1-hour windowing to the input data, the data gets grouped by both 
team _and_ window. You can see the full sequence of transforms in 
`HourlyTeamScore`'s main method:
 
 ```java
-public static void main(String[] args) throws Exception {
-  // Begin constructing a pipeline configured by commandline flags.
-  Options options = 
PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
-  Pipeline pipeline = Pipeline.create(options);
-
-  final Instant stopMinTimestamp = new 
Instant(minFmt.parseMillis(options.getStopMin()));
-  final Instant startMinTimestamp = new 
Instant(minFmt.parseMillis(options.getStartMin()));
-
-  // Read 'gaming' events from a text file.
-  pipeline.apply(TextIO.read().from(options.getInput()))
-    // Parse the incoming data.
-    .apply("ParseGameEvent", ParDo.of(new ParseEventFn()))
-
-    // Filter out data before and after the given times so that it is not 
included
-    // in the calculations. As we collect data in batches (say, by day), the 
batch for the day
-    // that we want to analyze could potentially include some late-arriving 
data from the previous
-    // day. If so, we want to weed it out. Similarly, if we include data from 
the following day
-    // (to scoop up late-arriving events from the day we're analyzing), we 
need to weed out events
-    // that fall after the time period we want to analyze.
-    // [START DocInclude_HTSFilters]
-    .apply("FilterStartTime", Filter.by(
-        (GameActionInfo gInfo)
-            -> gInfo.getTimestamp() > startMinTimestamp.getMillis()))
-    .apply("FilterEndTime", Filter.by(
-        (GameActionInfo gInfo)
-            -> gInfo.getTimestamp() < stopMinTimestamp.getMillis()))
-    // [END DocInclude_HTSFilters]
-
-    // [START DocInclude_HTSAddTsAndWindow]
-    // Add an element timestamp based on the event log, and apply fixed 
windowing.
-    .apply("AddEventTimestamps",
-           WithTimestamps.of((GameActionInfo i) -> new 
Instant(i.getTimestamp())))
-    .apply("FixedWindowsTeam", Window.<GameActionInfo>into(
-        
FixedWindows.of(Duration.standardMinutes(options.getWindowDuration()))))
-    // [END DocInclude_HTSAddTsAndWindow]
-
-    // Extract and sum teamname/score pairs from the event data.
-    .apply("ExtractTeamScore", new ExtractAndSumScore("team"))
-    .apply("WriteTeamScoreSums",
-      new WriteWindowedToBigQuery<KV<String, Integer>>(options.getTableName(),
-          configureWindowedTableWrite()));
-
-  pipeline.run().waitUntilFinish();
-}
-```
-
+{% github_sample 
/apache/beam/blob/master/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java
 tag:DocInclude_HTSMain
+%}```
 ```py
-class HourlyTeamScore(beam.PTransform):
-  def __init__(self, start_min, stop_min, window_duration):
-    super(HourlyTeamScore, self).__init__()
-    self.start_min = start_min
-    self.stop_min = stop_min
-    self.window_duration = window_duration
-
-  def expand(self, pcoll):
-    start_min_filter = string_to_timestamp(self.start_min)
-    end_min_filter = string_to_timestamp(self.stop_min)
-
-    return (
-        pcoll
-        | 'ParseGameEvent' >> beam.ParDo(ParseEventFn())
-        # Filter out data before and after the given times so that it is not
-        # included in the calculations. As we collect data in batches (say, by
-        # day), the batch for the day that we want to analyze could potentially
-        # include some late-arriving data from the previous day. If so, we want
-        # to weed it out. Similarly, if we include data from the following day
-        # (to scoop up late-arriving events from the day we're analyzing), we
-        # need to weed out events that fall after the time period we want to
-        # analyze.
-        | 'FilterStartTime' >> beam.Filter(
-            lambda element: element['timestamp'] > start_min_filter)
-        | 'FilterEndTime' >> beam.Filter(
-            lambda element: element['timestamp'] < end_min_filter)
-        # Add an element timestamp based on the event log, and apply fixed
-        # windowing.
-        # Convert element['timestamp'] into seconds as expected by
-        # TimestampedValue.
-        | 'AddEventTimestamps' >> beam.Map(
-            lambda element: TimestampedValue(
-                element, element['timestamp'] / 1000.0))
-        # Convert window_duration into seconds as expected by FixedWindows.
-        | 'FixedWindowsTeam' >> beam.WindowInto(FixedWindows(
-            size=self.window_duration * 60))
-        # Extract and sum teamname/score pairs from the event data.
-        | 'ExtractTeamScore' >> ExtractAndSumScore('team'))
-
-
-def run(argv=None):
-  """Main entry point; defines and runs the hourly_team_score pipeline."""
-  ...
-
-  known_args, pipeline_args = parser.parse_known_args(argv)
-
-  pipeline_options = PipelineOptions(pipeline_args)
-  p = beam.Pipeline(options=pipeline_options)
-  pipeline_options.view_as(SetupOptions).save_main_session = True
-
-  (p  # pylint: disable=expression-not-assigned
-   | ReadFromText(known_args.input)
-   | HourlyTeamScore(
-       known_args.start_min, known_args.stop_min, known_args.window_duration)
-   | WriteWindowedToBigQuery(
-       known_args.table_name, known_args.dataset, configure_bigquery_write()))
-
-  result = p.run()
-  result.wait_until_finish()
-```
+{% github_sample 
/apache/beam/blob/master/sdks/python/apache_beam/examples/complete/game/hourly_team_score.py
 tag:main
+%}```
 
 ### Limitations
 
@@ -419,26 +215,27 @@ As written, `HourlyTeamScore` still has a limitation:
 
 * `HourlyTeamScore` still has **high latency** between when data events occur 
(the event time) and when results are generated (the processing time), because, 
as a batch pipeline, it needs to wait to begin processing until all data events 
are present.
 
-
 ## LeaderBoard: Streaming Processing with Real-Time Game Data
 
-> **Note:** This example currently exists in Java only.
-
 One way we can help address the latency issue present in the `UserScore` and 
`HourlyTeamScore` pipelines is by reading the score data from an unbounded 
source. The `LeaderBoard` pipeline introduces streaming processing by reading 
the game score data from an unbounded source that produces an infinite amount 
of data, rather than from a file on the game server.
 
 The `LeaderBoard` pipeline also demonstrates how to process game score data 
with respect to both _processing time_ and _event time_. `LeaderBoard` outputs 
data about both individual user scores and about team scores, each with respect 
to a different time frame.
 
-Because the `LeaderBoard` pipeline reads the game data from an unbounded 
source as that data is generated, you can think of the pipeline as an ongoing 
job running concurrently with the game process. `LeaderBoard` can thus provide 
low-latency insights into how users are playing the game at any given 
moment—useful if, for example, we want to provide a live web-based scoreboard 
so that users can track their progress against other users as they play.
+Because the `LeaderBoard` pipeline reads the game data from an unbounded 
source as that data is generated, you can think of the pipeline as an ongoing 
job running concurrently with the game process. `LeaderBoard` can thus provide 
low-latency insights into how users are playing the game at any given moment — 
useful if, for example, we want to provide a live web-based scoreboard so that 
users can track their progress against other users as they play.
 
+{:.language-java}
 > **Note:** See [LeaderBoard on 
 > GitHub](https://github.com/apache/beam/blob/master/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java)
 >  for the complete example pipeline program.
 
+{:.language-py}
+> **Note:** See [LeaderBoard on 
GitHub](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/complete/game/leader_board.py)
 for the complete example pipeline program.
+
 ### What Does LeaderBoard Do?
 
 The `LeaderBoard` pipeline reads game data published to an unbounded source 
that produces an infinite amount of data in near real-time, and uses that data 
to perform two separate processing tasks:
 
-* `LeaderBoard` calculates the total score for every unique user and publishes 
speculative results for every ten minutes of _processing time_. That is, every 
ten minutes, the pipeline outputs the total score per user that the pipeline 
has processed to date. This calculation provides a running "leader board" in 
close to real time, regardless of when the actual game events were generated.
+* `LeaderBoard` calculates the total score for every unique user and publishes 
speculative results for every ten minutes of _processing time_. That is, ten 
minutes after data is received, the pipeline outputs the total score per user 
that the pipeline has processed to date. This calculation provides a running 
"leader board" in close to real time, regardless of when the actual game events 
were generated.
 
-* `LeaderBoard` calculates the team scores for each hour that the pipeline 
runs. This is useful if we want to, for example, reward the top-scoring team 
for each hour of play. The team score calculation uses fixed-time windowing to 
divide the input data into hour-long finite windows based on the _event time_ 
(indicated by the timestamp) as data arrives in the pipeline.  
+* `LeaderBoard` calculates the team scores for each hour that the pipeline 
runs. This is useful if we want to, for example, reward the top-scoring team 
for each hour of play. The team score calculation uses fixed-time windowing to 
divide the input data into hour-long finite windows based on the _event time_ 
(indicated by the timestamp) as data arrives in the pipeline.
 
     In addition, the team score calculation uses Beam's trigger mechanisms to 
provide speculative results for each hour (which update every five minutes 
until the hour is up), and to also capture any late data and add it to the 
specific hour-long window to which it belongs.
 
@@ -446,57 +243,35 @@ Below, we'll look at both of these tasks in detail.
 
 #### Calculating User Score based on Processing Time
 
-We want our pipeline to output a running total score for each user for every 
ten minutes that the pipeline runs. This calculation doesn't consider _when_ 
the actual score was generated by the user's play instance; it simply outputs 
the sum of all the scores for that user that have arrived in the pipeline to 
date. Late data gets included in the calculation whenever it happens to arrive 
in the pipeline as it's running.
+We want our pipeline to output a running total score for each user for every 
ten minutes of processing time. This calculation doesn't consider _when_ the 
actual score was generated by the user's play instance; it simply outputs the 
sum of all the scores for that user that have arrived in the pipeline to date. 
Late data gets included in the calculation whenever it happens to arrive in the 
pipeline as it's running.
 
 Because we want all the data that has arrived in the pipeline every time we 
update our calculation, we have the pipeline consider all of the user score 
data in a **single global window**. The single global window is unbounded, but 
we can specify a kind of temporary cut-off point for each ten-minute 
calculation by using a processing time [trigger]({{ site.baseurl 
}}/documentation/programming-guide/#triggers).
 
-When we specify a ten-minute processing time trigger for the single global 
window, the pipeline effectively takes a "snapshot" of the contents of the 
window every time the trigger fires. This snapshot happens at ten-minute 
intervals as long as data has arrived. If no data has arrived, the pipeline 
will take its next "snapshot" 10 minutes past an element arriving. Since we're 
using a single global window, each snapshot contains all the data collected _to 
that point in time_. The following [...]
+When we specify a ten-minute processing time trigger for the single global 
window, the pipeline effectively takes a "snapshot" of the contents of the 
window every time the trigger fires. This snapshot happens after ten minutes 
have passed since data was received. If no data has arrived, the pipeline takes 
its next "snapshot" 10 minutes after an element arrives. Since we're using a 
single global window, each snapshot contains all the data collected _to that 
point in time_. The following d [...]
 
 <figure id="fig4">
     <img src="{{ site.baseurl }}/images/gaming-example-proc-time-narrow.gif"
          width="900" height="263"
-         alt="Score data for for three users.">
+         alt="Score data for three users.">
 </figure>
-Figure 4: Score data for for three users. Each user's scores are grouped 
together in a single global window, with a trigger that generates a snapshot 
for output every ten minutes.
+**Figure 4:** Score data for three users. Each user's scores are grouped 
together in a single global window, with a trigger that generates a snapshot 
for output ten minutes after data is received.
 
 As processing time advances and more scores are processed, the trigger outputs 
the updated sum for each user.
 
 The following code example shows how `LeaderBoard` sets the processing time 
trigger to output the data for user scores:
 
 ```java
-/**
- * Extract user/score pairs from the event stream using processing time, via 
global windowing.
- * Get periodic updates on all users' running scores.
- */
-@VisibleForTesting
-static class CalculateUserScores
-    extends PTransform<PCollection<GameActionInfo>, PCollection<KV<String, 
Integer>>> {
-  private final Duration allowedLateness;
-
-  CalculateUserScores(Duration allowedLateness) {
-    this.allowedLateness = allowedLateness;
-  }
-
-  @Override
-  public PCollection<KV<String, Integer>> expand(PCollection<GameActionInfo> 
input) {
-    return input.apply("LeaderboardUserGlobalWindow",
-        Window.<GameActionInfo>into(new GlobalWindows())
-            // Get periodic results every ten minutes.
-            
.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()
-                .plusDelayOf(TEN_MINUTES)))
-            .accumulatingFiredPanes()
-            .withAllowedLateness(allowedLateness))
-        // Extract and sum username/score pairs from the event data.
-        .apply("ExtractUserScore", new ExtractAndSumScore("user"));
-  }
-}
-```
-
-Note that `LeaderBoard` uses an accumulating trigger for the user score 
calculation (by invoking `.accumulatingFiredPanes` when setting the trigger). 
Using an accumulating trigger causes the pipeline to accumulate the previously 
emitted data together with any new data that's arrived since the last trigger 
fire. This ensures that `LeaderBoard` a running sum for the user scores, rather 
than a collection of individual sums.
+{% github_sample 
/apache/beam/blob/master/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
 tag:DocInclude_ProcTimeTrigger
+%}```
+```py
+{% github_sample 
/apache/beam/blob/master/sdks/python/apache_beam/examples/complete/game/leader_board.py
 tag:processing_time_trigger
+%}```
+
+Note that `LeaderBoard` uses an accumulating trigger for the user score 
calculation (by <span class="language-java">invoking 
`.accumulatingFiredPanes`</span> <span class="language-py">using 
`accumulation_mode=trigger.AccumulationMode.ACCUMULATING`</span> when setting 
the trigger). Using an accumulating trigger causes the pipeline to accumulate 
the previously emitted data together with any new data that's arrived since the 
last trigger fire. This ensures that `LeaderBoard` is a running su [...]
 
 #### Calculating Team Score based on Event Time
 
-We want our pipeline to also output the total score for each team during each 
hour of play. Unlike the user score calculation, for team scores, we care about 
when in _event_ time each score actually occurred, because we want to consider 
each hour of play individually. We also want to provide speculative updates as 
each individual hour progresses, and to allow any instances of late data—data 
that arrives after a given hour's data is considered complete—to be included in 
our calculation.
+We want our pipeline to also output the total score for each team during each 
hour of play. Unlike the user score calculation, for team scores, we care about 
when in _event_ time each score actually occurred, because we want to consider 
each hour of play individually. We also want to provide speculative updates as 
each individual hour progresses, and to allow any instances of late data — data 
that arrives after a given hour's data is considered complete — to be included 
in our calculation.
 
 Because we consider each hour individually, we can apply fixed-time windowing 
to our input data, just like in `HourlyTeamScore`. To provide the speculative 
updates and updates on late data, we'll specify additional trigger parameters. 
The trigger will cause each window to calculate and emit results at an interval 
we specify (in this case, every five minutes), and also to keep triggering 
after the window is considered "complete" to account for late data. Just like 
the user score calculati [...]
 
@@ -511,57 +286,35 @@ The following diagram shows the relationship between 
ongoing processing time and
          width="900" height="390"
          alt="Score data by team, windowed by event time.">
 </figure>
-Figure 5: Score data by team, windowed by event time. A trigger based on 
processing time causes the window to emit speculative early results and include 
late results.
+**Figure 5:** Score data by team, windowed by event time. A trigger based on 
processing time causes the window to emit speculative early results and include 
late results.
 
 The dotted line in the diagram is the "ideal" **watermark**: Beam's notion of 
when all data in a given window can reasonably be considered to have arrived. 
The irregular solid line represents the actual watermark, as determined by the 
data source.
 
-Data arriving above the solid watermark line is _late data_—this is a score 
event that was delayed (perhaps generated offline) and arrived after the window 
to which it belongs had closed. Our pipeline's late-firing trigger ensures that 
this late data is still included in the sum.
+Data arriving above the solid watermark line is _late data_ — this is a score 
event that was delayed (perhaps generated offline) and arrived after the window 
to which it belongs had closed. Our pipeline's late-firing trigger ensures that 
this late data is still included in the sum.
 
 The following code example shows how `LeaderBoard` applies fixed-time 
windowing with the appropriate triggers to have our pipeline perform the 
calculations we want:
 
 ```java
-// Extract team/score pairs from the event stream, using hour-long windows by 
default.
-static class CalculateTeamScores
-    extends PTransform<PCollection<GameActionInfo>, PCollection<KV<String, 
Integer>>> {
-  private final Duration teamWindowDuration;
-  private final Duration allowedLateness;
-
-  CalculateTeamScores(Duration teamWindowDuration, Duration allowedLateness) {
-    this.teamWindowDuration = teamWindowDuration;
-    this.allowedLateness = allowedLateness;
-  }
-
-  @Override
-  public PCollection<KV<String, Integer>> expand(PCollection<GameActionInfo> 
infos) {
-    return infos.apply("LeaderboardTeamFixedWindows",
-        Window.<GameActionInfo>into(FixedWindows.of(teamWindowDuration))
-            // We will get early (speculative) results as well as cumulative
-            // processing of late data.
-            .triggering(AfterWatermark.pastEndOfWindow()
-                .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
-                    .plusDelayOf(FIVE_MINUTES))
-                .withLateFirings(AfterProcessingTime.pastFirstElementInPane()
-                    .plusDelayOf(TEN_MINUTES)))
-            .withAllowedLateness(allowedLateness)
-            .accumulatingFiredPanes())
-        // Extract and sum teamname/score pairs from the event data.
-        .apply("ExtractTeamScore", new ExtractAndSumScore("team"));
-  }
-}
-```
+{% github_sample 
/apache/beam/blob/master/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
 tag:DocInclude_WindowAndTrigger
+%}```
+```py
+{% github_sample 
/apache/beam/blob/master/sdks/python/apache_beam/examples/complete/game/leader_board.py
 tag:window_and_trigger
+%}```
 
 Taken together, these processing strategies let us address the latency and 
completeness issues present in the `UserScore` and `HourlyTeamScore` pipelines, 
while still using the same basic transforms to process the data—as a matter of 
fact, both calculations still use the same `ExtractAndSumScore` transform that 
we used in both the `UserScore` and `HourlyTeamScore` pipelines.
 
 ## GameStats: Abuse Detection and Usage Analysis
 
-> **Note:** This example currently exists in Java only.
-
 While `LeaderBoard` demonstrates how to use basic windowing and triggers to 
perform low-latency and flexible data analysis, we can use more advanced 
windowing techniques to perform more comprehensive analysis. This might include 
some calculations designed to detect system abuse (like spam) or to gain 
insight into user behavior. The `GameStats` pipeline builds on the low-latency 
functionality in `LeaderBoard` to demonstrate how you can use Beam to perform 
this kind of advanced analysis.
 
 Like `LeaderBoard`, `GameStats` reads data from an unbounded source. It is 
best thought of as an ongoing job that provides insight into the game as users 
play.
 
+{:.language-java}
 > **Note:** See [GameStats on 
 > GitHub](https://github.com/apache/beam/blob/master/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java)
 >  for the complete example pipeline program.
 
+{:.language-py}
+> **Note:** See [GameStats on 
GitHub](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/complete/game/game_stats.py)
 for the complete example pipeline program.
+
 ### What Does GameStats Do?
 
 Like `LeaderBoard`, `GameStats` calculates the total score per team, per hour. 
However, the pipeline also performs two kinds of more complex analysis:
@@ -582,72 +335,20 @@ Since the average depends on the pipeline data, we need 
to calculate it, and the
 The following code example shows the composite transform that handles abuse 
detection. The transform uses the `Sum.integersPerKey` transform to sum all 
scores per user, and then the `Mean.globally` transform to determine the 
average score for all users. Once that's been calculated (as a 
`PCollectionView` singleton), we can pass it to the filtering `ParDo` using 
`.withSideInputs`:
 
 ```java
-public static class CalculateSpammyUsers
-    extends PTransform<PCollection<KV<String, Integer>>, 
PCollection<KV<String, Integer>>> {
-  private static final Logger LOG = 
LoggerFactory.getLogger(CalculateSpammyUsers.class);
-  private static final double SCORE_WEIGHT = 2.5;
-
-  @Override
-  public PCollection<KV<String, Integer>> expand(PCollection<KV<String, 
Integer>> userScores) {
-
-    // Get the sum of scores for each user.
-    PCollection<KV<String, Integer>> sumScores = userScores
-        .apply("UserSum", Sum.<String>integersPerKey());
-
-    // Extract the score from each element, and use it to find the global mean.
-    final PCollectionView<Double> globalMeanScore = 
sumScores.apply(Values.<Integer>create())
-        .apply(Mean.<Integer>globally().asSingletonView());
-
-    // Filter the user sums using the global mean.
-    PCollection<KV<String, Integer>> filtered = sumScores
-        .apply("ProcessAndFilter", ParDo
-            .of(new DoFn<KV<String, Integer>, KV<String, Integer>>() {
-              private final Aggregator<Long, Long> numSpammerUsers =
-                createAggregator("SpammerUsers", new Sum.SumLongFn());
-              @ProcessElement
-              public void processElement(ProcessContext c) {
-                Integer score = c.element().getValue();
-                Double gmc = c.sideInput(globalMeanScore);
-                if (score > (gmc * SCORE_WEIGHT)) {
-                  LOG.info("user " + c.element().getKey() + " spammer score " 
+ score
-                      + " with mean " + gmc);
-                  numSpammerUsers.addValue(1L);
-                  c.output(c.element());
-                }
-              }
-            })
-            // use the derived mean total score as a side input
-            .withSideInputs(globalMeanScore));
-    return filtered;
-  }
-}
-```
+{% github_sample 
/apache/beam/blob/master/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
 tag:DocInclude_AbuseDetect
+%}```
+```py
+{% github_sample 
/apache/beam/blob/master/sdks/python/apache_beam/examples/complete/game/game_stats.py
 tag:abuse_detect
+%}```
 
 The abuse-detection transform generates a view of users supected to be 
spambots. Later in the pipeline, we use that view to filter out any such users 
when we calculate the team score per hour, again by using the side input 
mechanism. The following code example shows where we insert the spam filter, 
between windowing the scores into fixed windows and extracting the team scores:
 
 ```java
-// Calculate the total score per team over fixed windows,
-// and emit cumulative updates for late data. Uses the side input derived 
above-- the set of
-// suspected robots-- to filter out scores from those users from the sum.
-// Write the results to BigQuery.
-rawEvents
-  .apply("WindowIntoFixedWindows", Window.<GameActionInfo>into(
-      
FixedWindows.of(Duration.standardMinutes(options.getFixedWindowDuration()))))
-  // Filter out the detected spammer users, using the side input derived above.
-  .apply("FilterOutSpammers", ParDo
-          .of(new DoFn<GameActionInfo, GameActionInfo>() {
-            @ProcessElement
-            public void processElement(ProcessContext c) {
-              // If the user is not in the spammers Map, output the data 
element.
-              if (c.sideInput(spammersView).get(c.element().getUser().trim()) 
== null) {
-                c.output(c.element());
-              }
-            }
-          })
-          .withSideInputs(spammersView))
-  // Extract and sum teamname/score pairs from the event data.
-  .apply("ExtractTeamScore", new ExtractAndSumScore("team"))
-```
+{% github_sample 
/apache/beam/blob/master/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
 tag:DocInclude_FilterAndCalc
+%}```
+```py
+{% github_sample 
/apache/beam/blob/master/sdks/python/apache_beam/examples/complete/game/game_stats.py
 tag:filter_and_calc
+%}```
 
 #### Analyzing Usage Patterns
 
@@ -663,39 +364,25 @@ The following diagram shows how data might look when 
grouped into session window
          alt="A diagram representing session windowing."
          alt="User sessions, with a minimum gap duration.">
 </figure>
-Figure 6: User sessions, with a minimum gap duration. Note how each user has 
different sessions, according to how many instances they play and how long 
their breaks between instances are.
+**Figure 6:** User sessions, with a minimum gap duration. Note how each user 
has different sessions, according to how many instances they play and how long 
their breaks between instances are.
 
 We can use the session-windowed data to determine the average length of 
uninterrupted play time for all of our users, as well as the total score they 
achieve during each session. We can do this in the code by first applying 
session windows, summing the score per user and session, and then using a 
transform to calculate the length of each individual session:
 
 ```java
-// Detect user sessions-- that is, a burst of activity separated by a gap from 
further
-// activity. Find and record the mean session lengths.
-// This information could help the game designers track the changing user 
engagement
-// as their set of games changes.
-userEvents
-  .apply("WindowIntoSessions", Window.<KV<String, Integer>>into(
-      
Sessions.withGapDuration(Duration.standardMinutes(options.getSessionGap())))
-      .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow()))
-  // For this use, we care only about the existence of the session, not any 
particular
-  // information aggregated over it, so the following is an efficient way to 
do that.
-  .apply(Combine.perKey(x -> 0))
-  // Get the duration per session.
-  .apply("UserSessionActivity", ParDo.of(new UserSessionInfoFn()))
-```
+{% github_sample 
/apache/beam/blob/master/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
 tag:DocInclude_SessionCalc
+%}```
+```py
+{% github_sample 
/apache/beam/blob/master/sdks/python/apache_beam/examples/complete/game/game_stats.py
 tag:session_calc
+%}```
 
 This gives us a set of user sessions, each with an attached duration. We can 
then calculate the _average_ session length by re-windowing the data into fixed 
time windows, and then calculating the average for all sessions that end in 
each hour:
 
 ```java
-// Re-window to process groups of session sums according to when the sessions 
complete.
-.apply("WindowToExtractSessionMean", Window.<Integer>into(
-    
FixedWindows.of(Duration.standardMinutes(options.getUserActivityWindowDuration()))))
-// Find the mean session duration in each window.
-.apply(Mean.<Integer>globally().withoutDefaults())
-// Write this info to a BigQuery table.
-.apply("WriteAvgSessionLength",
-       new WriteWindowedToBigQuery<Double>(
-          options.getTablePrefix() + "_sessions", 
configureSessionWindowWrite()));
-```
+{% github_sample 
/apache/beam/blob/master/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
 tag:DocInclude_Rewindow
+%}```
+```py
+{% github_sample 
/apache/beam/blob/master/sdks/python/apache_beam/examples/complete/game/game_stats.py
 tag:rewindow
+%}```
 
 We can use the resulting information to find, for example, what times of day 
our users are playing the longest, or which stretches of the day are more 
likely to see shorter play sessions.
 

-- 
To stop receiving notification emails like this one, please contact
"[email protected]" <[email protected]>.

Reply via email to