Repository: beam
Updated Branches:
  refs/heads/master c7ff46d46 -> f8e119292


Added snippet tags for documentation


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/71e323c9
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/71e323c9
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/71e323c9

Branch: refs/heads/master
Commit: 71e323c98ee5b2ec829309c05269a1e6ba4d5832
Parents: c7ff46d
Author: David Cavazos <[email protected]>
Authored: Mon Aug 28 15:24:20 2017 -0700
Committer: Ahmet Altay <[email protected]>
Committed: Fri Sep 8 11:24:13 2017 -0700

----------------------------------------------------------------------
 .../examples/complete/game/game_stats.py        | 30 ++++---
 .../examples/complete/game/hourly_team_score.py | 89 +++++++++++---------
 .../examples/complete/game/leader_board.py      | 26 +++---
 .../examples/complete/game/user_score.py        |  4 +
 4 files changed, 83 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/71e323c9/sdks/python/apache_beam/examples/complete/game/game_stats.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/game/game_stats.py 
b/sdks/python/apache_beam/examples/complete/game/game_stats.py
index 4181323..d8c60dd 100644
--- a/sdks/python/apache_beam/examples/complete/game/game_stats.py
+++ b/sdks/python/apache_beam/examples/complete/game/game_stats.py
@@ -200,6 +200,7 @@ class WriteToBigQuery(beam.PTransform):
             write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)))
 
 
+# [START abuse_detect]
 class CalculateSpammyUsers(beam.PTransform):
   """Filter out all but those users with a high clickrate, which we will
   consider as 'spammy' uesrs.
@@ -232,6 +233,7 @@ class CalculateSpammyUsers(beam.PTransform):
                 score > global_mean * self.SCORE_WEIGHT,
             global_mean_score))
     return filtered
+# [END abuse_detect]
 
 
 class UserSessionActivity(beam.DoFn):
@@ -325,16 +327,11 @@ def run(argv=None):
         | 'CreateSpammersView' >> beam.CombineGlobally(
             beam.combiners.ToDictCombineFn()).as_singleton_view())
 
+    # [START filter_and_calc]
     # Calculate the total score per team over fixed windows, and emit 
cumulative
     # updates for late data. Uses the side input derived above --the set of
     # suspected robots-- to filter out scores from those users from the sum.
     # Write the results to BigQuery.
-    teams_schema = {
-        'team': 'STRING',
-        'total_score': 'INTEGER',
-        'window_start': 'STRING',
-        'processing_time': 'STRING',
-    }
     (raw_events  # pylint: disable=expression-not-assigned
      | 'WindowIntoFixedWindows' >> beam.WindowInto(
          beam.window.FixedWindows(fixed_window_duration))
@@ -345,17 +342,21 @@ def run(argv=None):
          spammers_view)
      # Extract and sum teamname/score pairs from the event data.
      | 'ExtractAndSumScore' >> ExtractAndSumScore('team')
+     # [END filter_and_calc]
      | 'TeamScoresDict' >> beam.ParDo(TeamScoresDict())
      | 'WriteTeamScoreSums' >> WriteToBigQuery(
-         args.table_name + '_teams', args.dataset, teams_schema))
-
+         args.table_name + '_teams', args.dataset, {
+             'team': 'STRING',
+             'total_score': 'INTEGER',
+             'window_start': 'STRING',
+             'processing_time': 'STRING',
+         }))
+
+    # [START session_calc]
     # Detect user sessions-- that is, a burst of activity separated by a gap
     # from further activity. Find and record the mean session lengths.
     # This information could help the game designers track the changing user
     # engagement as their set of game changes.
-    sessions_schema = {
-        'mean_duration': 'FLOAT',
-    }
     (user_events  # pylint: disable=expression-not-assigned
      | 'WindowIntoSessions' >> beam.WindowInto(
          beam.window.Sessions(session_gap),
@@ -368,7 +369,9 @@ def run(argv=None):
 
      # Get the duration of the session
      | 'UserSessionActivity' >> beam.ParDo(UserSessionActivity())
+     # [END session_calc]
 
+     # [START rewindow]
      # Re-window to process groups of session sums according to when the
      # sessions complete
      | 'WindowToExtractSessionMean' >> beam.WindowInto(
@@ -379,7 +382,10 @@ def run(argv=None):
      | 'FormatAvgSessionLength' >> beam.Map(
          lambda elem: {'mean_duration': float(elem)})
      | 'WriteAvgSessionLength' >> WriteToBigQuery(
-         args.table_name + '_sessions', args.dataset, sessions_schema))
+         args.table_name + '_sessions', args.dataset, {
+             'mean_duration': 'FLOAT',
+         }))
+     # [END rewindow]
 
 
 if __name__ == '__main__':

http://git-wip-us.apache.org/repos/asf/beam/blob/71e323c9/sdks/python/apache_beam/examples/complete/game/hourly_team_score.py
----------------------------------------------------------------------
diff --git 
a/sdks/python/apache_beam/examples/complete/game/hourly_team_score.py 
b/sdks/python/apache_beam/examples/complete/game/hourly_team_score.py
index 9dd8b05..b286a6a 100644
--- a/sdks/python/apache_beam/examples/complete/game/hourly_team_score.py
+++ b/sdks/python/apache_beam/examples/complete/game/hourly_team_score.py
@@ -138,42 +138,6 @@ class ExtractAndSumScore(beam.PTransform):
             | beam.CombinePerKey(sum))
 
 
-class HourlyTeamScore(beam.PTransform):
-  def __init__(self, start_min, stop_min, window_duration):
-    super(HourlyTeamScore, self).__init__()
-    self.start_timestamp = str2timestamp(start_min)
-    self.stop_timestamp = str2timestamp(stop_min)
-    self.window_duration_in_seconds = window_duration * 60
-
-  def expand(self, pcoll):
-    return (
-        pcoll
-        | 'ParseGameEventFn' >> beam.ParDo(ParseGameEventFn())
-
-        # Filter out data before and after the given times so that it is not
-        # included in the calculations. As we collect data in batches (say, by
-        # day), the batch for the day that we want to analyze could potentially
-        # include some late-arriving data from the previous day. If so, we want
-        # to weed it out. Similarly, if we include data from the following day
-        # (to scoop up late-arriving events from the day we're analyzing), we
-        # need to weed out events that fall after the time period we want to
-        # analyze.
-        | 'FilterStartTime' >> beam.Filter(
-            lambda elem: elem['timestamp'] > self.start_timestamp)
-        | 'FilterEndTime' >> beam.Filter(
-            lambda elem: elem['timestamp'] < self.stop_timestamp)
-
-        # Add an element timestamp based on the event log, and apply fixed
-        # windowing.
-        | 'AddEventTimestamps' >> beam.Map(
-            lambda elem: beam.window.TimestampedValue(elem, elem['timestamp']))
-        | 'FixedWindowsTeam' >> beam.WindowInto(
-            beam.window.FixedWindows(self.window_duration_in_seconds))
-
-        # Extract and sum teamname/score pairs from the event data.
-        | 'ExtractAndSumScore' >> ExtractAndSumScore('team'))
-
-
 class TeamScoresDict(beam.DoFn):
   """Formats the data into a dictionary of BigQuery columns with their values
 
@@ -229,6 +193,47 @@ class WriteToBigQuery(beam.PTransform):
             write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)))
 
 
+# [START main]
+class HourlyTeamScore(beam.PTransform):
+  def __init__(self, start_min, stop_min, window_duration):
+    super(HourlyTeamScore, self).__init__()
+    self.start_timestamp = str2timestamp(start_min)
+    self.stop_timestamp = str2timestamp(stop_min)
+    self.window_duration_in_seconds = window_duration * 60
+
+  def expand(self, pcoll):
+    return (
+        pcoll
+        | 'ParseGameEventFn' >> beam.ParDo(ParseGameEventFn())
+
+        # Filter out data before and after the given times so that it is not
+        # included in the calculations. As we collect data in batches (say, by
+        # day), the batch for the day that we want to analyze could potentially
+        # include some late-arriving data from the previous day. If so, we want
+        # to weed it out. Similarly, if we include data from the following day
+        # (to scoop up late-arriving events from the day we're analyzing), we
+        # need to weed out events that fall after the time period we want to
+        # analyze.
+        # [START filter_by_time_range]
+        | 'FilterStartTime' >> beam.Filter(
+            lambda elem: elem['timestamp'] > self.start_timestamp)
+        | 'FilterEndTime' >> beam.Filter(
+            lambda elem: elem['timestamp'] < self.stop_timestamp)
+        # [END filter_by_time_range]
+
+        # [START add_timestamp_and_window]
+        # Add an element timestamp based on the event log, and apply fixed
+        # windowing.
+        | 'AddEventTimestamps' >> beam.Map(
+            lambda elem: beam.window.TimestampedValue(elem, elem['timestamp']))
+        | 'FixedWindowsTeam' >> beam.WindowInto(
+            beam.window.FixedWindows(self.window_duration_in_seconds))
+        # [END add_timestamp_and_window]
+
+        # Extract and sum teamname/score pairs from the event data.
+        | 'ExtractAndSumScore' >> ExtractAndSumScore('team'))
+
+
 def run(argv=None):
   """Main entry point; defines and runs the hourly_team_score pipeline."""
   parser = argparse.ArgumentParser()
@@ -282,11 +287,6 @@ def run(argv=None):
   # workflow rely on global context (e.g., a module imported at module level).
   options.view_as(SetupOptions).save_main_session = True
 
-  schema = {
-      'team': 'STRING',
-      'total_score': 'INTEGER',
-      'window_start': 'STRING',
-  }
   with beam.Pipeline(options=options) as p:
     (p  # pylint: disable=expression-not-assigned
      | 'ReadInputText' >> beam.io.ReadFromText(args.input)
@@ -294,7 +294,12 @@ def run(argv=None):
          args.start_min, args.stop_min, args.window_duration)
      | 'TeamScoresDict' >> beam.ParDo(TeamScoresDict())
      | 'WriteTeamScoreSums' >> WriteToBigQuery(
-         args.table_name, args.dataset, schema))
+         args.table_name, args.dataset, {
+             'team': 'STRING',
+             'total_score': 'INTEGER',
+             'window_start': 'STRING',
+         }))
+# [END main]
 
 
 if __name__ == '__main__':

http://git-wip-us.apache.org/repos/asf/beam/blob/71e323c9/sdks/python/apache_beam/examples/complete/game/leader_board.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/game/leader_board.py 
b/sdks/python/apache_beam/examples/complete/game/leader_board.py
index 2936bc9..69676f8 100644
--- a/sdks/python/apache_beam/examples/complete/game/leader_board.py
+++ b/sdks/python/apache_beam/examples/complete/game/leader_board.py
@@ -208,6 +208,7 @@ class WriteToBigQuery(beam.PTransform):
             write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)))
 
 
+# [START window_and_trigger]
 class CalculateTeamScores(beam.PTransform):
   """Calculates scores for each team within the configured window duration.
 
@@ -234,8 +235,10 @@ class CalculateTeamScores(beam.PTransform):
             accumulation_mode=trigger.AccumulationMode.ACCUMULATING)
         # Extract and sum teamname/score pairs from the event data.
         | 'ExtractAndSumScore' >> ExtractAndSumScore('team'))
+# [END window_and_trigger]
 
 
+# [START processing_time_trigger]
 class CalculateUserScores(beam.PTransform):
   """Extract user/score pairs from the event stream using processing time, via
   global windowing. Get periodic updates on all users' running scores.
@@ -257,6 +260,7 @@ class CalculateUserScores(beam.PTransform):
             accumulation_mode=trigger.AccumulationMode.ACCUMULATING)
         # Extract and sum username/score pairs from the event data.
         | 'ExtractAndSumScore' >> ExtractAndSumScore('user'))
+# [END processing_time_trigger]
 
 
 def run(argv=None):
@@ -313,30 +317,28 @@ def run(argv=None):
             lambda elem: beam.window.TimestampedValue(elem, 
elem['timestamp'])))
 
     # Get team scores and write the results to BigQuery
-    teams_schema = {
-        'team': 'STRING',
-        'total_score': 'INTEGER',
-        'window_start': 'STRING',
-        'processing_time': 'STRING',
-    }
     (events  # pylint: disable=expression-not-assigned
      | 'CalculateTeamScores' >> CalculateTeamScores(
          args.team_window_duration, args.allowed_lateness)
      | 'TeamScoresDict' >> beam.ParDo(TeamScoresDict())
      | 'WriteTeamScoreSums' >> WriteToBigQuery(
-         args.table_name + '_teams', args.dataset, teams_schema))
+         args.table_name + '_teams', args.dataset, {
+             'team': 'STRING',
+             'total_score': 'INTEGER',
+             'window_start': 'STRING',
+             'processing_time': 'STRING',
+         }))
 
     # Get user scores and write the results to BigQuery
-    users_schema = {
-        'user': 'STRING',
-        'total_score': 'INTEGER',
-    }
     (events  # pylint: disable=expression-not-assigned
      | 'CalculateUserScores' >> CalculateUserScores(args.allowed_lateness)
      | 'FormatUserScoreSums' >> beam.Map(
          lambda (user, score): {'user': user, 'total_score': score})
      | 'WriteUserScoreSums' >> WriteToBigQuery(
-         args.table_name + '_users', args.dataset, users_schema))
+         args.table_name + '_users', args.dataset, {
+             'user': 'STRING',
+             'total_score': 'INTEGER',
+         }))
 
 
 if __name__ == '__main__':

http://git-wip-us.apache.org/repos/asf/beam/blob/71e323c9/sdks/python/apache_beam/examples/complete/game/user_score.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/game/user_score.py 
b/sdks/python/apache_beam/examples/complete/game/user_score.py
index ee78d63..cf9976d 100644
--- a/sdks/python/apache_beam/examples/complete/game/user_score.py
+++ b/sdks/python/apache_beam/examples/complete/game/user_score.py
@@ -93,6 +93,7 @@ class ParseGameEventFn(beam.DoFn):
       logging.error('Parse error on "%s"', elem)
 
 
+# [START extract_and_sum_score]
 class ExtractAndSumScore(beam.PTransform):
   """A transform to extract key/score information and sum the scores.
   The constructor argument `field` determines whether 'team' or 'user' info is
@@ -106,6 +107,7 @@ class ExtractAndSumScore(beam.PTransform):
     return (pcoll
             | beam.Map(lambda elem: (elem[self.field], elem['score']))
             | beam.CombinePerKey(sum))
+# [END extract_and_sum_score]
 
 
 class UserScore(beam.PTransform):
@@ -117,6 +119,7 @@ class UserScore(beam.PTransform):
         | 'ExtractAndSumScore' >> ExtractAndSumScore('user'))
 
 
+# [START main]
 def run(argv=None):
   """Main entry point; defines and runs the user_score pipeline."""
   parser = argparse.ArgumentParser()
@@ -141,6 +144,7 @@ def run(argv=None):
      | 'FormatUserScoreSums' >> beam.Map(
          lambda (user, score): 'user: %s, total_score: %s' % (user, score))
      | 'WriteUserScoreSums' >> beam.io.WriteToText(args.output))
+# [END main]
 
 
 if __name__ == '__main__':

Reply via email to