Repository: beam Updated Branches: refs/heads/master c7ff46d46 -> f8e119292
Added snippet tags for documentation Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/71e323c9 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/71e323c9 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/71e323c9 Branch: refs/heads/master Commit: 71e323c98ee5b2ec829309c05269a1e6ba4d5832 Parents: c7ff46d Author: David Cavazos <[email protected]> Authored: Mon Aug 28 15:24:20 2017 -0700 Committer: Ahmet Altay <[email protected]> Committed: Fri Sep 8 11:24:13 2017 -0700 ---------------------------------------------------------------------- .../examples/complete/game/game_stats.py | 30 ++++--- .../examples/complete/game/hourly_team_score.py | 89 +++++++++++--------- .../examples/complete/game/leader_board.py | 26 +++--- .../examples/complete/game/user_score.py | 4 + 4 files changed, 83 insertions(+), 66 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/71e323c9/sdks/python/apache_beam/examples/complete/game/game_stats.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/complete/game/game_stats.py b/sdks/python/apache_beam/examples/complete/game/game_stats.py index 4181323..d8c60dd 100644 --- a/sdks/python/apache_beam/examples/complete/game/game_stats.py +++ b/sdks/python/apache_beam/examples/complete/game/game_stats.py @@ -200,6 +200,7 @@ class WriteToBigQuery(beam.PTransform): write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))) +# [START abuse_detect] class CalculateSpammyUsers(beam.PTransform): """Filter out all but those users with a high clickrate, which we will consider as 'spammy' uesrs. @@ -232,6 +233,7 @@ class CalculateSpammyUsers(beam.PTransform): score > global_mean * self.SCORE_WEIGHT, global_mean_score)) return filtered +# [END abuse_detect] class UserSessionActivity(beam.DoFn): @@ -325,16 +327,11 @@ def run(argv=None): | 'CreateSpammersView' >> beam.CombineGlobally( beam.combiners.ToDictCombineFn()).as_singleton_view()) + # [START filter_and_calc] # Calculate the total score per team over fixed windows, and emit cumulative # updates for late data. Uses the side input derived above --the set of # suspected robots-- to filter out scores from those users from the sum. # Write the results to BigQuery. - teams_schema = { - 'team': 'STRING', - 'total_score': 'INTEGER', - 'window_start': 'STRING', - 'processing_time': 'STRING', - } (raw_events # pylint: disable=expression-not-assigned | 'WindowIntoFixedWindows' >> beam.WindowInto( beam.window.FixedWindows(fixed_window_duration)) @@ -345,17 +342,21 @@ def run(argv=None): spammers_view) # Extract and sum teamname/score pairs from the event data. | 'ExtractAndSumScore' >> ExtractAndSumScore('team') + # [END filter_and_calc] | 'TeamScoresDict' >> beam.ParDo(TeamScoresDict()) | 'WriteTeamScoreSums' >> WriteToBigQuery( - args.table_name + '_teams', args.dataset, teams_schema)) - + args.table_name + '_teams', args.dataset, { + 'team': 'STRING', + 'total_score': 'INTEGER', + 'window_start': 'STRING', + 'processing_time': 'STRING', + })) + + # [START session_calc] # Detect user sessions-- that is, a burst of activity separated by a gap # from further activity. Find and record the mean session lengths. # This information could help the game designers track the changing user # engagement as their set of game changes. - sessions_schema = { - 'mean_duration': 'FLOAT', - } (user_events # pylint: disable=expression-not-assigned | 'WindowIntoSessions' >> beam.WindowInto( beam.window.Sessions(session_gap), @@ -368,7 +369,9 @@ def run(argv=None): # Get the duration of the session | 'UserSessionActivity' >> beam.ParDo(UserSessionActivity()) + # [END session_calc] + # [START rewindow] # Re-window to process groups of session sums according to when the # sessions complete | 'WindowToExtractSessionMean' >> beam.WindowInto( @@ -379,7 +382,10 @@ def run(argv=None): | 'FormatAvgSessionLength' >> beam.Map( lambda elem: {'mean_duration': float(elem)}) | 'WriteAvgSessionLength' >> WriteToBigQuery( - args.table_name + '_sessions', args.dataset, sessions_schema)) + args.table_name + '_sessions', args.dataset, { + 'mean_duration': 'FLOAT', + })) + # [END rewindow] if __name__ == '__main__': http://git-wip-us.apache.org/repos/asf/beam/blob/71e323c9/sdks/python/apache_beam/examples/complete/game/hourly_team_score.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/complete/game/hourly_team_score.py b/sdks/python/apache_beam/examples/complete/game/hourly_team_score.py index 9dd8b05..b286a6a 100644 --- a/sdks/python/apache_beam/examples/complete/game/hourly_team_score.py +++ b/sdks/python/apache_beam/examples/complete/game/hourly_team_score.py @@ -138,42 +138,6 @@ class ExtractAndSumScore(beam.PTransform): | beam.CombinePerKey(sum)) -class HourlyTeamScore(beam.PTransform): - def __init__(self, start_min, stop_min, window_duration): - super(HourlyTeamScore, self).__init__() - self.start_timestamp = str2timestamp(start_min) - self.stop_timestamp = str2timestamp(stop_min) - self.window_duration_in_seconds = window_duration * 60 - - def expand(self, pcoll): - return ( - pcoll - | 'ParseGameEventFn' >> beam.ParDo(ParseGameEventFn()) - - # Filter out data before and after the given times so that it is not - # included in the calculations. As we collect data in batches (say, by - # day), the batch for the day that we want to analyze could potentially - # include some late-arriving data from the previous day. If so, we want - # to weed it out. Similarly, if we include data from the following day - # (to scoop up late-arriving events from the day we're analyzing), we - # need to weed out events that fall after the time period we want to - # analyze. - | 'FilterStartTime' >> beam.Filter( - lambda elem: elem['timestamp'] > self.start_timestamp) - | 'FilterEndTime' >> beam.Filter( - lambda elem: elem['timestamp'] < self.stop_timestamp) - - # Add an element timestamp based on the event log, and apply fixed - # windowing. - | 'AddEventTimestamps' >> beam.Map( - lambda elem: beam.window.TimestampedValue(elem, elem['timestamp'])) - | 'FixedWindowsTeam' >> beam.WindowInto( - beam.window.FixedWindows(self.window_duration_in_seconds)) - - # Extract and sum teamname/score pairs from the event data. - | 'ExtractAndSumScore' >> ExtractAndSumScore('team')) - - class TeamScoresDict(beam.DoFn): """Formats the data into a dictionary of BigQuery columns with their values @@ -229,6 +193,47 @@ class WriteToBigQuery(beam.PTransform): write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))) +# [START main] +class HourlyTeamScore(beam.PTransform): + def __init__(self, start_min, stop_min, window_duration): + super(HourlyTeamScore, self).__init__() + self.start_timestamp = str2timestamp(start_min) + self.stop_timestamp = str2timestamp(stop_min) + self.window_duration_in_seconds = window_duration * 60 + + def expand(self, pcoll): + return ( + pcoll + | 'ParseGameEventFn' >> beam.ParDo(ParseGameEventFn()) + + # Filter out data before and after the given times so that it is not + # included in the calculations. As we collect data in batches (say, by + # day), the batch for the day that we want to analyze could potentially + # include some late-arriving data from the previous day. If so, we want + # to weed it out. Similarly, if we include data from the following day + # (to scoop up late-arriving events from the day we're analyzing), we + # need to weed out events that fall after the time period we want to + # analyze. + # [START filter_by_time_range] + | 'FilterStartTime' >> beam.Filter( + lambda elem: elem['timestamp'] > self.start_timestamp) + | 'FilterEndTime' >> beam.Filter( + lambda elem: elem['timestamp'] < self.stop_timestamp) + # [END filter_by_time_range] + + # [START add_timestamp_and_window] + # Add an element timestamp based on the event log, and apply fixed + # windowing. + | 'AddEventTimestamps' >> beam.Map( + lambda elem: beam.window.TimestampedValue(elem, elem['timestamp'])) + | 'FixedWindowsTeam' >> beam.WindowInto( + beam.window.FixedWindows(self.window_duration_in_seconds)) + # [END add_timestamp_and_window] + + # Extract and sum teamname/score pairs from the event data. + | 'ExtractAndSumScore' >> ExtractAndSumScore('team')) + + def run(argv=None): """Main entry point; defines and runs the hourly_team_score pipeline.""" parser = argparse.ArgumentParser() @@ -282,11 +287,6 @@ def run(argv=None): # workflow rely on global context (e.g., a module imported at module level). options.view_as(SetupOptions).save_main_session = True - schema = { - 'team': 'STRING', - 'total_score': 'INTEGER', - 'window_start': 'STRING', - } with beam.Pipeline(options=options) as p: (p # pylint: disable=expression-not-assigned | 'ReadInputText' >> beam.io.ReadFromText(args.input) @@ -294,7 +294,12 @@ def run(argv=None): args.start_min, args.stop_min, args.window_duration) | 'TeamScoresDict' >> beam.ParDo(TeamScoresDict()) | 'WriteTeamScoreSums' >> WriteToBigQuery( - args.table_name, args.dataset, schema)) + args.table_name, args.dataset, { + 'team': 'STRING', + 'total_score': 'INTEGER', + 'window_start': 'STRING', + })) +# [END main] if __name__ == '__main__': http://git-wip-us.apache.org/repos/asf/beam/blob/71e323c9/sdks/python/apache_beam/examples/complete/game/leader_board.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/complete/game/leader_board.py b/sdks/python/apache_beam/examples/complete/game/leader_board.py index 2936bc9..69676f8 100644 --- a/sdks/python/apache_beam/examples/complete/game/leader_board.py +++ b/sdks/python/apache_beam/examples/complete/game/leader_board.py @@ -208,6 +208,7 @@ class WriteToBigQuery(beam.PTransform): write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))) +# [START window_and_trigger] class CalculateTeamScores(beam.PTransform): """Calculates scores for each team within the configured window duration. @@ -234,8 +235,10 @@ class CalculateTeamScores(beam.PTransform): accumulation_mode=trigger.AccumulationMode.ACCUMULATING) # Extract and sum teamname/score pairs from the event data. | 'ExtractAndSumScore' >> ExtractAndSumScore('team')) +# [END window_and_trigger] +# [START processing_time_trigger] class CalculateUserScores(beam.PTransform): """Extract user/score pairs from the event stream using processing time, via global windowing. Get periodic updates on all users' running scores. @@ -257,6 +260,7 @@ class CalculateUserScores(beam.PTransform): accumulation_mode=trigger.AccumulationMode.ACCUMULATING) # Extract and sum username/score pairs from the event data. | 'ExtractAndSumScore' >> ExtractAndSumScore('user')) +# [END processing_time_trigger] def run(argv=None): @@ -313,30 +317,28 @@ def run(argv=None): lambda elem: beam.window.TimestampedValue(elem, elem['timestamp']))) # Get team scores and write the results to BigQuery - teams_schema = { - 'team': 'STRING', - 'total_score': 'INTEGER', - 'window_start': 'STRING', - 'processing_time': 'STRING', - } (events # pylint: disable=expression-not-assigned | 'CalculateTeamScores' >> CalculateTeamScores( args.team_window_duration, args.allowed_lateness) | 'TeamScoresDict' >> beam.ParDo(TeamScoresDict()) | 'WriteTeamScoreSums' >> WriteToBigQuery( - args.table_name + '_teams', args.dataset, teams_schema)) + args.table_name + '_teams', args.dataset, { + 'team': 'STRING', + 'total_score': 'INTEGER', + 'window_start': 'STRING', + 'processing_time': 'STRING', + })) # Get user scores and write the results to BigQuery - users_schema = { - 'user': 'STRING', - 'total_score': 'INTEGER', - } (events # pylint: disable=expression-not-assigned | 'CalculateUserScores' >> CalculateUserScores(args.allowed_lateness) | 'FormatUserScoreSums' >> beam.Map( lambda (user, score): {'user': user, 'total_score': score}) | 'WriteUserScoreSums' >> WriteToBigQuery( - args.table_name + '_users', args.dataset, users_schema)) + args.table_name + '_users', args.dataset, { + 'user': 'STRING', + 'total_score': 'INTEGER', + })) if __name__ == '__main__': http://git-wip-us.apache.org/repos/asf/beam/blob/71e323c9/sdks/python/apache_beam/examples/complete/game/user_score.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/complete/game/user_score.py b/sdks/python/apache_beam/examples/complete/game/user_score.py index ee78d63..cf9976d 100644 --- a/sdks/python/apache_beam/examples/complete/game/user_score.py +++ b/sdks/python/apache_beam/examples/complete/game/user_score.py @@ -93,6 +93,7 @@ class ParseGameEventFn(beam.DoFn): logging.error('Parse error on "%s"', elem) +# [START extract_and_sum_score] class ExtractAndSumScore(beam.PTransform): """A transform to extract key/score information and sum the scores. The constructor argument `field` determines whether 'team' or 'user' info is @@ -106,6 +107,7 @@ class ExtractAndSumScore(beam.PTransform): return (pcoll | beam.Map(lambda elem: (elem[self.field], elem['score'])) | beam.CombinePerKey(sum)) +# [END extract_and_sum_score] class UserScore(beam.PTransform): @@ -117,6 +119,7 @@ class UserScore(beam.PTransform): | 'ExtractAndSumScore' >> ExtractAndSumScore('user')) +# [START main] def run(argv=None): """Main entry point; defines and runs the user_score pipeline.""" parser = argparse.ArgumentParser() @@ -141,6 +144,7 @@ def run(argv=None): | 'FormatUserScoreSums' >> beam.Map( lambda (user, score): 'user: %s, total_score: %s' % (user, score)) | 'WriteUserScoreSums' >> beam.io.WriteToText(args.output)) +# [END main] if __name__ == '__main__':
