This is an automated email from the ASF dual-hosted git repository.
altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 2ed527b [BEAM-4333] Add integration tests for python mobile games
(#5747)
2ed527b is described below
commit 2ed527b880a9e408123974148fbff04da374bba3
Author: mariapython <[email protected]>
AuthorDate: Mon Jun 25 17:50:15 2018 -0700
[BEAM-4333] Add integration tests for python mobile games (#5747)
Add game_stats_it_test.py
Fix datasets in *_it_tests not deleted
Relocate *_it_tests in the same directory as the code they test
---
.../examples/complete/game/game_stats.py | 15 ++++-
.../game/game_stats_it_test.py} | 71 +++++++++++-----------
.../game}/hourly_team_score_it_test.py | 5 ++
.../{ => complete/game}/leader_board_it_test.py | 5 ++
.../{ => complete/game}/user_score_it_test.py | 0
5 files changed, 56 insertions(+), 40 deletions(-)
diff --git a/sdks/python/apache_beam/examples/complete/game/game_stats.py
b/sdks/python/apache_beam/examples/complete/game/game_stats.py
index e370764..69f5250 100644
--- a/sdks/python/apache_beam/examples/complete/game/game_stats.py
+++ b/sdks/python/apache_beam/examples/complete/game/game_stats.py
@@ -241,6 +241,9 @@ def run(argv=None):
type=str,
required=True,
help='Pub/Sub topic to read from')
+ parser.add_argument('--subscription',
+ type=str,
+ help='Pub/Sub subscription to read from')
parser.add_argument('--dataset',
type=str,
required=True,
@@ -288,10 +291,16 @@ def run(argv=None):
options.view_as(StandardOptions).streaming = True
with beam.Pipeline(options=options) as p:
- # Read events from Pub/Sub using custom timestamps
+ # Read game events from Pub/Sub using custom timestamps, which
+ # are extracted from the data elements, and parse the data.
+ if args.subscription:
+ scores = p | 'ReadPubSub' >> beam.io.ReadStringsFromPubSub(
+ subscription=args.subscription)
+ else:
+ scores = p | 'ReadPubSub' >> beam.io.ReadStringsFromPubSub(
+ topic=args.topic)
raw_events = (
- p
- | 'ReadPubSub' >> beam.io.gcp.pubsub.ReadStringsFromPubSub(args.topic)
+ scores
| 'ParseGameEventFn' >> beam.ParDo(ParseGameEventFn())
| 'AddEventTimestamps' >> beam.Map(
lambda elem: beam.window.TimestampedValue(elem,
elem['timestamp'])))
diff --git a/sdks/python/apache_beam/examples/leader_board_it_test.py
b/sdks/python/apache_beam/examples/complete/game/game_stats_it_test.py
similarity index 69%
copy from sdks/python/apache_beam/examples/leader_board_it_test.py
copy to sdks/python/apache_beam/examples/complete/game/game_stats_it_test.py
index 44297bd..6dc60d0 100644
--- a/sdks/python/apache_beam/examples/leader_board_it_test.py
+++ b/sdks/python/apache_beam/examples/complete/game/game_stats_it_test.py
@@ -15,9 +15,9 @@
# limitations under the License.
#
-"""End-to-end test for the leader board example.
+"""End-to-end test for the game stats example.
-Code: beam/sdks/python/apache_beam/examples/complete/game/leader_board.py
+Code: beam/sdks/python/apache_beam/examples/complete/game/game_stats.py
Usage:
python setup.py nosetests --test-pipeline-options=" \
@@ -36,12 +36,11 @@ import logging
import time
import unittest
import uuid
-from builtins import range
from hamcrest.core.core.allof import all_of
from nose.plugins.attrib import attr
-from apache_beam.examples.complete.game import leader_board
+from apache_beam.examples.complete.game import game_stats
from apache_beam.io.gcp.tests import utils
from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryMatcher
from apache_beam.runners.runner import PipelineState
@@ -50,21 +49,21 @@ from apache_beam.testing.pipeline_verifiers import
PipelineStateMatcher
from apache_beam.testing.test_pipeline import TestPipeline
-class LeaderBoardIT(unittest.TestCase):
+class GameStatsIT(unittest.TestCase):
- # Input event containing user, team, score, processing time, window start.
+ # Input events containing user, team, score, processing time, window start.
INPUT_EVENT = 'user1,teamA,10,%d,2015-11-02 09:09:28.224'
- INPUT_TOPIC = 'leader_board_it_input_topic'
- INPUT_SUB = 'leader_board_it_input_subscription'
+ INPUT_TOPIC = 'game_stats_it_input_topic'
+ INPUT_SUB = 'game_stats_it_input_subscription'
# SHA-1 hash generated from sorted rows reading from BigQuery table
- DEFAULT_EXPECTED_CHECKSUM = 'de00231fe6730b972c0ff60a99988438911cda53'
- OUTPUT_DATASET = 'leader_board_it_dataset'
- OUTPUT_TABLE_USERS = 'leader_board_users'
- OUTPUT_TABLE_TEAMS = 'leader_board_teams'
+ DEFAULT_EXPECTED_CHECKSUM = '5288ccaab77d347c8460d77c15a0db234ef5eb4f'
+ OUTPUT_DATASET = 'game_stats_it_dataset'
+ OUTPUT_TABLE_SESSIONS = 'game_stats_sessions'
+ OUTPUT_TABLE_TEAMS = 'game_stats_teams'
DEFAULT_INPUT_COUNT = 500
- WAIT_UNTIL_FINISH_DURATION = 10 * 60 * 1000 # in milliseconds
+ WAIT_UNTIL_FINISH_DURATION = 12 * 60 * 1000 # in milliseconds
def setUp(self):
self.test_pipeline = TestPipeline(is_integration_test=True)
@@ -105,43 +104,41 @@ class LeaderBoardIT(unittest.TestCase):
test_utils.cleanup_subscriptions([self.input_sub])
test_utils.cleanup_topics([self.input_topic])
+ def _cleanup_dataset(self):
+ self.dataset.delete()
+
@attr('IT')
- def test_leader_board_it(self):
+ def test_game_stats_it(self):
state_verifier = PipelineStateMatcher(PipelineState.RUNNING)
- success_condition = 'total_score=5000 LIMIT 1'
- users_query = ('SELECT total_score FROM [%s:%s.%s] '
- 'WHERE %s' % (self.project,
- self.dataset.name,
- self.OUTPUT_TABLE_USERS,
- success_condition))
- bq_users_verifier = BigqueryMatcher(self.project,
- users_query,
- self.DEFAULT_EXPECTED_CHECKSUM)
-
- teams_query = ('SELECT total_score FROM [%s:%s.%s] '
- 'WHERE %s' % (self.project,
- self.dataset.name,
- self.OUTPUT_TABLE_TEAMS,
- success_condition))
- bq_teams_verifier = BigqueryMatcher(self.project,
- teams_query,
- self.DEFAULT_EXPECTED_CHECKSUM)
+ success_condition = 'mean_duration=300 LIMIT 1'
+ sessions_query = ('SELECT mean_duration FROM [%s:%s.%s] '
+ 'WHERE %s' % (self.project,
+ self.dataset.name,
+ self.OUTPUT_TABLE_SESSIONS,
+ success_condition))
+ bq_sessions_verifier = BigqueryMatcher(self.project,
+ sessions_query,
+ self.DEFAULT_EXPECTED_CHECKSUM)
+
+ # TODO(mariagh): Add teams table verifier once game_stats.py is fixed.
extra_opts = {'subscription': self.input_sub.full_name,
'dataset': self.dataset.name,
'topic': self.input_topic.full_name,
- 'team_window_duration': 1,
+ 'fixed_window_duration': 1,
+ 'user_activity_window_duration': 1,
'wait_until_finish_duration':
self.WAIT_UNTIL_FINISH_DURATION,
'on_success_matcher': all_of(state_verifier,
- bq_users_verifier,
- bq_teams_verifier)}
+ bq_sessions_verifier)}
# Register cleanup before pipeline execution.
+ # Note that actual execution happens in reverse order.
self.addCleanup(self._cleanup_pubsub)
+ self.addCleanup(self._cleanup_dataset)
self.addCleanup(utils.delete_bq_table, self.project,
- self.dataset.name, self.OUTPUT_TABLE_USERS)
+ self.dataset.name, self.OUTPUT_TABLE_SESSIONS)
self.addCleanup(utils.delete_bq_table, self.project,
self.dataset.name, self.OUTPUT_TABLE_TEAMS)
@@ -152,7 +149,7 @@ class LeaderBoardIT(unittest.TestCase):
# Get pipeline options from command argument: --test-pipeline-options,
# and start pipeline job by calling pipeline main function.
- leader_board.run(
+ game_stats.run(
self.test_pipeline.get_full_options_as_args(**extra_opts))
diff --git a/sdks/python/apache_beam/examples/hourly_team_score_it_test.py
b/sdks/python/apache_beam/examples/complete/game/hourly_team_score_it_test.py
similarity index 95%
rename from sdks/python/apache_beam/examples/hourly_team_score_it_test.py
rename to
sdks/python/apache_beam/examples/complete/game/hourly_team_score_it_test.py
index e46bf47..584aa05 100644
--- a/sdks/python/apache_beam/examples/hourly_team_score_it_test.py
+++
b/sdks/python/apache_beam/examples/complete/game/hourly_team_score_it_test.py
@@ -66,6 +66,9 @@ class HourlyTeamScoreIT(unittest.TestCase):
self.dataset = client.dataset(unique_dataset_name, project=self.project)
self.dataset.create()
+ def _cleanup_dataset(self):
+ self.dataset.delete()
+
@attr('IT')
def test_hourly_team_score_it(self):
state_verifier = PipelineStateMatcher(PipelineState.DONE)
@@ -84,6 +87,8 @@ class HourlyTeamScoreIT(unittest.TestCase):
bigquery_verifier)}
# Register clean up before pipeline execution
+ # Note that actual execution happens in reverse order.
+ self.addCleanup(self._cleanup_dataset)
self.addCleanup(utils.delete_bq_table, self.project,
self.dataset.name, self.OUTPUT_TABLE)
diff --git a/sdks/python/apache_beam/examples/leader_board_it_test.py
b/sdks/python/apache_beam/examples/complete/game/leader_board_it_test.py
similarity index 97%
rename from sdks/python/apache_beam/examples/leader_board_it_test.py
rename to sdks/python/apache_beam/examples/complete/game/leader_board_it_test.py
index 44297bd..ab10942 100644
--- a/sdks/python/apache_beam/examples/leader_board_it_test.py
+++ b/sdks/python/apache_beam/examples/complete/game/leader_board_it_test.py
@@ -105,6 +105,9 @@ class LeaderBoardIT(unittest.TestCase):
test_utils.cleanup_subscriptions([self.input_sub])
test_utils.cleanup_topics([self.input_topic])
+ def _cleanup_dataset(self):
+ self.dataset.delete()
+
@attr('IT')
def test_leader_board_it(self):
state_verifier = PipelineStateMatcher(PipelineState.RUNNING)
@@ -139,7 +142,9 @@ class LeaderBoardIT(unittest.TestCase):
bq_teams_verifier)}
# Register cleanup before pipeline execution.
+ # Note that actual execution happens in reverse order.
self.addCleanup(self._cleanup_pubsub)
+ self.addCleanup(self._cleanup_dataset)
self.addCleanup(utils.delete_bq_table, self.project,
self.dataset.name, self.OUTPUT_TABLE_USERS)
self.addCleanup(utils.delete_bq_table, self.project,
diff --git a/sdks/python/apache_beam/examples/user_score_it_test.py
b/sdks/python/apache_beam/examples/complete/game/user_score_it_test.py
similarity index 100%
rename from sdks/python/apache_beam/examples/user_score_it_test.py
rename to sdks/python/apache_beam/examples/complete/game/user_score_it_test.py