This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 2ed527b  [BEAM-4333] Add integration tests for python mobile games 
(#5747)
2ed527b is described below

commit 2ed527b880a9e408123974148fbff04da374bba3
Author: mariapython <[email protected]>
AuthorDate: Mon Jun 25 17:50:15 2018 -0700

    [BEAM-4333] Add integration tests for python mobile games (#5747)
    
    Add game_stats_it_test.py
    Fix datasets in *_it_tests not deleted
    Relocate *_it_tests in the same directory as the code they test
---
 .../examples/complete/game/game_stats.py           | 15 ++++-
 .../game/game_stats_it_test.py}                    | 71 +++++++++++-----------
 .../game}/hourly_team_score_it_test.py             |  5 ++
 .../{ => complete/game}/leader_board_it_test.py    |  5 ++
 .../{ => complete/game}/user_score_it_test.py      |  0
 5 files changed, 56 insertions(+), 40 deletions(-)

diff --git a/sdks/python/apache_beam/examples/complete/game/game_stats.py 
b/sdks/python/apache_beam/examples/complete/game/game_stats.py
index e370764..69f5250 100644
--- a/sdks/python/apache_beam/examples/complete/game/game_stats.py
+++ b/sdks/python/apache_beam/examples/complete/game/game_stats.py
@@ -241,6 +241,9 @@ def run(argv=None):
                       type=str,
                       required=True,
                       help='Pub/Sub topic to read from')
+  parser.add_argument('--subscription',
+                      type=str,
+                      help='Pub/Sub subscription to read from')
   parser.add_argument('--dataset',
                       type=str,
                       required=True,
@@ -288,10 +291,16 @@ def run(argv=None):
   options.view_as(StandardOptions).streaming = True
 
   with beam.Pipeline(options=options) as p:
-    # Read events from Pub/Sub using custom timestamps
+    # Read game events from Pub/Sub using custom timestamps, which
+    # are extracted from the data elements, and parse the data.
+    if args.subscription:
+      scores = p | 'ReadPubSub' >> beam.io.ReadStringsFromPubSub(
+          subscription=args.subscription)
+    else:
+      scores = p | 'ReadPubSub' >> beam.io.ReadStringsFromPubSub(
+          topic=args.topic)
     raw_events = (
-        p
-        | 'ReadPubSub' >> beam.io.gcp.pubsub.ReadStringsFromPubSub(args.topic)
+        scores
         | 'ParseGameEventFn' >> beam.ParDo(ParseGameEventFn())
         | 'AddEventTimestamps' >> beam.Map(
             lambda elem: beam.window.TimestampedValue(elem, 
elem['timestamp'])))
diff --git a/sdks/python/apache_beam/examples/leader_board_it_test.py 
b/sdks/python/apache_beam/examples/complete/game/game_stats_it_test.py
similarity index 69%
copy from sdks/python/apache_beam/examples/leader_board_it_test.py
copy to sdks/python/apache_beam/examples/complete/game/game_stats_it_test.py
index 44297bd..6dc60d0 100644
--- a/sdks/python/apache_beam/examples/leader_board_it_test.py
+++ b/sdks/python/apache_beam/examples/complete/game/game_stats_it_test.py
@@ -15,9 +15,9 @@
 # limitations under the License.
 #
 
-"""End-to-end test for the leader board example.
+"""End-to-end test for the game stats example.
 
-Code: beam/sdks/python/apache_beam/examples/complete/game/leader_board.py
+Code: beam/sdks/python/apache_beam/examples/complete/game/game_stats.py
 Usage:
 
   python setup.py nosetests --test-pipeline-options=" \
@@ -36,12 +36,11 @@ import logging
 import time
 import unittest
 import uuid
-from builtins import range
 
 from hamcrest.core.core.allof import all_of
 from nose.plugins.attrib import attr
 
-from apache_beam.examples.complete.game import leader_board
+from apache_beam.examples.complete.game import game_stats
 from apache_beam.io.gcp.tests import utils
 from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryMatcher
 from apache_beam.runners.runner import PipelineState
@@ -50,21 +49,21 @@ from apache_beam.testing.pipeline_verifiers import 
PipelineStateMatcher
 from apache_beam.testing.test_pipeline import TestPipeline
 
 
-class LeaderBoardIT(unittest.TestCase):
+class GameStatsIT(unittest.TestCase):
 
-  # Input event containing user, team, score, processing time, window start.
+  # Input events containing user, team, score, processing time, window start.
   INPUT_EVENT = 'user1,teamA,10,%d,2015-11-02 09:09:28.224'
-  INPUT_TOPIC = 'leader_board_it_input_topic'
-  INPUT_SUB = 'leader_board_it_input_subscription'
+  INPUT_TOPIC = 'game_stats_it_input_topic'
+  INPUT_SUB = 'game_stats_it_input_subscription'
 
   # SHA-1 hash generated from sorted rows reading from BigQuery table
-  DEFAULT_EXPECTED_CHECKSUM = 'de00231fe6730b972c0ff60a99988438911cda53'
-  OUTPUT_DATASET = 'leader_board_it_dataset'
-  OUTPUT_TABLE_USERS = 'leader_board_users'
-  OUTPUT_TABLE_TEAMS = 'leader_board_teams'
+  DEFAULT_EXPECTED_CHECKSUM = '5288ccaab77d347c8460d77c15a0db234ef5eb4f'
+  OUTPUT_DATASET = 'game_stats_it_dataset'
+  OUTPUT_TABLE_SESSIONS = 'game_stats_sessions'
+  OUTPUT_TABLE_TEAMS = 'game_stats_teams'
   DEFAULT_INPUT_COUNT = 500
 
-  WAIT_UNTIL_FINISH_DURATION = 10 * 60 * 1000   # in milliseconds
+  WAIT_UNTIL_FINISH_DURATION = 12 * 60 * 1000   # in milliseconds
 
   def setUp(self):
     self.test_pipeline = TestPipeline(is_integration_test=True)
@@ -105,43 +104,41 @@ class LeaderBoardIT(unittest.TestCase):
     test_utils.cleanup_subscriptions([self.input_sub])
     test_utils.cleanup_topics([self.input_topic])
 
+  def _cleanup_dataset(self):
+    self.dataset.delete()
+
   @attr('IT')
-  def test_leader_board_it(self):
+  def test_game_stats_it(self):
     state_verifier = PipelineStateMatcher(PipelineState.RUNNING)
 
-    success_condition = 'total_score=5000 LIMIT 1'
-    users_query = ('SELECT total_score FROM [%s:%s.%s] '
-                   'WHERE %s' % (self.project,
-                                 self.dataset.name,
-                                 self.OUTPUT_TABLE_USERS,
-                                 success_condition))
-    bq_users_verifier = BigqueryMatcher(self.project,
-                                        users_query,
-                                        self.DEFAULT_EXPECTED_CHECKSUM)
-
-    teams_query = ('SELECT total_score FROM [%s:%s.%s] '
-                   'WHERE %s' % (self.project,
-                                 self.dataset.name,
-                                 self.OUTPUT_TABLE_TEAMS,
-                                 success_condition))
-    bq_teams_verifier = BigqueryMatcher(self.project,
-                                        teams_query,
-                                        self.DEFAULT_EXPECTED_CHECKSUM)
+    success_condition = 'mean_duration=300 LIMIT 1'
+    sessions_query = ('SELECT mean_duration FROM [%s:%s.%s] '
+                      'WHERE %s' % (self.project,
+                                    self.dataset.name,
+                                    self.OUTPUT_TABLE_SESSIONS,
+                                    success_condition))
+    bq_sessions_verifier = BigqueryMatcher(self.project,
+                                           sessions_query,
+                                           self.DEFAULT_EXPECTED_CHECKSUM)
+
+    # TODO(mariagh): Add teams table verifier once game_stats.py is fixed.
 
     extra_opts = {'subscription': self.input_sub.full_name,
                   'dataset': self.dataset.name,
                   'topic': self.input_topic.full_name,
-                  'team_window_duration': 1,
+                  'fixed_window_duration': 1,
+                  'user_activity_window_duration': 1,
                   'wait_until_finish_duration':
                       self.WAIT_UNTIL_FINISH_DURATION,
                   'on_success_matcher': all_of(state_verifier,
-                                               bq_users_verifier,
-                                               bq_teams_verifier)}
+                                               bq_sessions_verifier)}
 
     # Register cleanup before pipeline execution.
+    # Note that actual execution happens in reverse order.
     self.addCleanup(self._cleanup_pubsub)
+    self.addCleanup(self._cleanup_dataset)
     self.addCleanup(utils.delete_bq_table, self.project,
-                    self.dataset.name, self.OUTPUT_TABLE_USERS)
+                    self.dataset.name, self.OUTPUT_TABLE_SESSIONS)
     self.addCleanup(utils.delete_bq_table, self.project,
                     self.dataset.name, self.OUTPUT_TABLE_TEAMS)
 
@@ -152,7 +149,7 @@ class LeaderBoardIT(unittest.TestCase):
 
     # Get pipeline options from command argument: --test-pipeline-options,
     # and start pipeline job by calling pipeline main function.
-    leader_board.run(
+    game_stats.run(
         self.test_pipeline.get_full_options_as_args(**extra_opts))
 
 
diff --git a/sdks/python/apache_beam/examples/hourly_team_score_it_test.py 
b/sdks/python/apache_beam/examples/complete/game/hourly_team_score_it_test.py
similarity index 95%
rename from sdks/python/apache_beam/examples/hourly_team_score_it_test.py
rename to 
sdks/python/apache_beam/examples/complete/game/hourly_team_score_it_test.py
index e46bf47..584aa05 100644
--- a/sdks/python/apache_beam/examples/hourly_team_score_it_test.py
+++ 
b/sdks/python/apache_beam/examples/complete/game/hourly_team_score_it_test.py
@@ -66,6 +66,9 @@ class HourlyTeamScoreIT(unittest.TestCase):
     self.dataset = client.dataset(unique_dataset_name, project=self.project)
     self.dataset.create()
 
+  def _cleanup_dataset(self):
+    self.dataset.delete()
+
   @attr('IT')
   def test_hourly_team_score_it(self):
     state_verifier = PipelineStateMatcher(PipelineState.DONE)
@@ -84,6 +87,8 @@ class HourlyTeamScoreIT(unittest.TestCase):
                                                bigquery_verifier)}
 
     # Register clean up before pipeline execution
+    # Note that actual execution happens in reverse order.
+    self.addCleanup(self._cleanup_dataset)
     self.addCleanup(utils.delete_bq_table, self.project,
                     self.dataset.name, self.OUTPUT_TABLE)
 
diff --git a/sdks/python/apache_beam/examples/leader_board_it_test.py 
b/sdks/python/apache_beam/examples/complete/game/leader_board_it_test.py
similarity index 97%
rename from sdks/python/apache_beam/examples/leader_board_it_test.py
rename to sdks/python/apache_beam/examples/complete/game/leader_board_it_test.py
index 44297bd..ab10942 100644
--- a/sdks/python/apache_beam/examples/leader_board_it_test.py
+++ b/sdks/python/apache_beam/examples/complete/game/leader_board_it_test.py
@@ -105,6 +105,9 @@ class LeaderBoardIT(unittest.TestCase):
     test_utils.cleanup_subscriptions([self.input_sub])
     test_utils.cleanup_topics([self.input_topic])
 
+  def _cleanup_dataset(self):
+    self.dataset.delete()
+
   @attr('IT')
   def test_leader_board_it(self):
     state_verifier = PipelineStateMatcher(PipelineState.RUNNING)
@@ -139,7 +142,9 @@ class LeaderBoardIT(unittest.TestCase):
                                                bq_teams_verifier)}
 
     # Register cleanup before pipeline execution.
+    # Note that actual execution happens in reverse order.
     self.addCleanup(self._cleanup_pubsub)
+    self.addCleanup(self._cleanup_dataset)
     self.addCleanup(utils.delete_bq_table, self.project,
                     self.dataset.name, self.OUTPUT_TABLE_USERS)
     self.addCleanup(utils.delete_bq_table, self.project,
diff --git a/sdks/python/apache_beam/examples/user_score_it_test.py 
b/sdks/python/apache_beam/examples/complete/game/user_score_it_test.py
similarity index 100%
rename from sdks/python/apache_beam/examples/user_score_it_test.py
rename to sdks/python/apache_beam/examples/complete/game/user_score_it_test.py

Reply via email to