This is an automated email from the ASF dual-hosted git repository.
altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 5e0f81a [BEAM-4333] Add integration tests for python mobile games
(#5630)
5e0f81a is described below
commit 5e0f81a9b4bceb5e92aa728ef1997863aeff925f
Author: mariapython <[email protected]>
AuthorDate: Thu Jun 14 10:41:57 2018 -0700
[BEAM-4333] Add integration tests for python mobile games (#5630)
* Add integration tests for python mobile games
---
.../examples/complete/game/leader_board.py | 15 +-
.../examples/hourly_team_score_it_test.py | 96 +++++++++++++
.../apache_beam/examples/leader_board_it_test.py | 158 +++++++++++++++++++++
.../apache_beam/examples/user_score_it_test.py | 84 +++++++++++
.../apache_beam/io/gcp/tests/bigquery_matcher.py | 7 +
5 files changed, 358 insertions(+), 2 deletions(-)
diff --git a/sdks/python/apache_beam/examples/complete/game/leader_board.py
b/sdks/python/apache_beam/examples/complete/game/leader_board.py
index 99a8e09..27ef16d 100644
--- a/sdks/python/apache_beam/examples/complete/game/leader_board.py
+++ b/sdks/python/apache_beam/examples/complete/game/leader_board.py
@@ -257,6 +257,9 @@ def run(argv=None):
type=str,
required=True,
help='Pub/Sub topic to read from')
+ parser.add_argument('--subscription',
+ type=str,
+ help='Pub/Sub subscription to read from')
parser.add_argument('--dataset',
type=str,
required=True,
@@ -295,9 +298,17 @@ def run(argv=None):
with beam.Pipeline(options=options) as p:
# Read game events from Pub/Sub using custom timestamps, which are
extracted
# from the pubsub data elements, and parse the data.
+
+ # Read from PubSub into a PCollection.
+ if args.subscription:
+ scores = p | 'ReadPubSub' >> beam.io.ReadStringsFromPubSub(
+ subscription=args.subscription)
+ else:
+ scores = p | 'ReadPubSub' >> beam.io.ReadStringsFromPubSub(
+ topic=args.topic)
+
events = (
- p
- | 'ReadPubSub' >> beam.io.gcp.pubsub.ReadStringsFromPubSub(args.topic)
+ scores
| 'ParseGameEventFn' >> beam.ParDo(ParseGameEventFn())
| 'AddEventTimestamps' >> beam.Map(
lambda elem: beam.window.TimestampedValue(elem,
elem['timestamp'])))
diff --git a/sdks/python/apache_beam/examples/hourly_team_score_it_test.py
b/sdks/python/apache_beam/examples/hourly_team_score_it_test.py
new file mode 100644
index 0000000..ffea48e
--- /dev/null
+++ b/sdks/python/apache_beam/examples/hourly_team_score_it_test.py
@@ -0,0 +1,96 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""End-to-end test for the hourly team score example.
+
+Code: beam/sdks/python/apache_beam/examples/complete/game/hourly_team_score.py
+Usage:
+
+ python setup.py nosetests --test-pipeline-options=" \
+ --runner=TestDataflowRunner \
+ --project=... \
+ --staging_location=gs://... \
+ --temp_location=gs://... \
+ --output=gs://... \
+ --sdk_location=... \
+
+"""
+
+import logging
+import time
+import unittest
+
+from hamcrest.core.core.allof import all_of
+from nose.plugins.attrib import attr
+
+from apache_beam.examples.complete.game import hourly_team_score
+from apache_beam.io.gcp.tests import utils
+from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryMatcher
+from apache_beam.runners.runner import PipelineState
+from apache_beam.testing.pipeline_verifiers import PipelineStateMatcher
+from apache_beam.testing.test_pipeline import TestPipeline
+
+
+class HourlyTeamScoreIT(unittest.TestCase):
+
+ DEFAULT_INPUT_FILE = 'gs://dataflow-samples/game/gaming_data*'
+ # SHA-1 hash generated from sorted rows reading from BigQuery table
+ DEFAULT_EXPECTED_CHECKSUM = '4fa761fb5c3341ec573d5d12c6ab75e3b2957a25'
+ OUTPUT_DATASET = 'hourly_team_score_it_dataset'
+ OUTPUT_TABLE = 'leader_board'
+
+ def setUp(self):
+ self.test_pipeline = TestPipeline(is_integration_test=True)
+ self.project = self.test_pipeline.get_option('project')
+
+ # Set up BigQuery environment
+ from google.cloud import bigquery
+ client = bigquery.Client()
+ unique_dataset_name = self.OUTPUT_DATASET + str(int(time.time()))
+ self.dataset = client.dataset(unique_dataset_name, project=self.project)
+ self.dataset.create()
+
+ @attr('IT')
+ def test_hourly_team_score_it(self):
+ state_verifier = PipelineStateMatcher(PipelineState.DONE)
+ query = ('SELECT COUNT(*) FROM [%s:%s.%s]' % (self.project,
+ self.dataset.name,
+ self.OUTPUT_TABLE))
+
+ bigquery_verifier = BigqueryMatcher(self.project,
+ query,
+ self.DEFAULT_EXPECTED_CHECKSUM)
+
+ extra_opts = {'input': self.DEFAULT_INPUT_FILE,
+ 'dataset': self.dataset.name,
+ 'window_duration': 1,
+ 'on_success_matcher': all_of(state_verifier,
+ bigquery_verifier)}
+
+ # Register clean up before pipeline execution
+ self.addCleanup(utils.delete_bq_table, self.project,
+ self.dataset.name, self.OUTPUT_TABLE)
+
+ # Get pipeline options from command argument: --test-pipeline-options,
+ # and start pipeline job by calling pipeline main function.
+ hourly_team_score.run(
+ self.test_pipeline.get_full_options_as_args(**extra_opts))
+
+
+if __name__ == '__main__':
+ logging.getLogger().setLevel(logging.DEBUG)
+ unittest.main()
diff --git a/sdks/python/apache_beam/examples/leader_board_it_test.py
b/sdks/python/apache_beam/examples/leader_board_it_test.py
new file mode 100644
index 0000000..12c3972
--- /dev/null
+++ b/sdks/python/apache_beam/examples/leader_board_it_test.py
@@ -0,0 +1,158 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""End-to-end test for the leader board example.
+
+Code: beam/sdks/python/apache_beam/examples/complete/game/leader_board.py
+Usage:
+
+ python setup.py nosetests --test-pipeline-options=" \
+ --runner=TestDataflowRunner \
+ --project=... \
+ --staging_location=gs://... \
+ --temp_location=gs://... \
+ --output=gs://... \
+ --sdk_location=... \
+
+"""
+
+import logging
+import time
+import unittest
+import uuid
+
+from hamcrest.core.core.allof import all_of
+from nose.plugins.attrib import attr
+
+from apache_beam.examples.complete.game import leader_board
+from apache_beam.io.gcp.tests import utils
+from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryMatcher
+from apache_beam.runners.runner import PipelineState
+from apache_beam.testing import test_utils
+from apache_beam.testing.pipeline_verifiers import PipelineStateMatcher
+from apache_beam.testing.test_pipeline import TestPipeline
+
+
+class LeaderBoardIT(unittest.TestCase):
+
+ # Input event containing user, team, score, processing time, window start.
+ INPUT_EVENT = 'user1,teamA,10,%d,2015-11-02 09:09:28.224'
+ INPUT_TOPIC = 'leader_board_it_input_topic'
+ INPUT_SUB = 'leader_board_it_input_subscription'
+
+ # SHA-1 hash generated from sorted rows reading from BigQuery table
+ DEFAULT_EXPECTED_CHECKSUM = 'de00231fe6730b972c0ff60a99988438911cda53'
+ OUTPUT_DATASET = 'leader_board_it_dataset'
+ OUTPUT_TABLE_USERS = 'leader_board_users'
+ OUTPUT_TABLE_TEAMS = 'leader_board_teams'
+ DEFAULT_INPUT_COUNT = 500
+
+ WAIT_UNTIL_FINISH_DURATION = 10 * 60 * 1000 # in milliseconds
+
+ def setUp(self):
+ self.test_pipeline = TestPipeline(is_integration_test=True)
+ self.project = self.test_pipeline.get_option('project')
+ _unique_id = str(uuid.uuid4())
+
+ # Set up PubSub environment.
+ from google.cloud import pubsub
+ self.pubsub_client = pubsub.Client(project=self.project)
+ unique_topic_name = self.INPUT_TOPIC + _unique_id
+ unique_subscrition_name = self.INPUT_SUB + _unique_id
+ self.input_topic = self.pubsub_client.topic(unique_topic_name)
+ self.input_sub = self.input_topic.subscription(unique_subscrition_name)
+
+ self.input_topic.create()
+ test_utils.wait_for_topics_created([self.input_topic])
+ self.input_sub.create()
+
+ # Set up BigQuery environment
+ from google.cloud import bigquery
+ client = bigquery.Client()
+ unique_dataset_name = self.OUTPUT_DATASET + str(int(time.time()))
+ self.dataset = client.dataset(unique_dataset_name, project=self.project)
+ self.dataset.create()
+
+ self._test_timestamp = int(time.time() * 1000)
+
+ def _inject_pubsub_game_events(self, topic, message_count):
+ """Inject game events as test data to PubSub."""
+
+ logging.debug('Injecting %d game events to topic %s',
+ message_count, topic.full_name)
+
+ for _ in xrange(message_count):
+ topic.publish(self.INPUT_EVENT % self._test_timestamp)
+
+ def _cleanup_pubsub(self):
+ test_utils.cleanup_subscriptions([self.input_sub])
+ test_utils.cleanup_topics([self.input_topic])
+
+ @attr('IT')
+ def test_leader_board_it(self):
+ state_verifier = PipelineStateMatcher(PipelineState.RUNNING)
+
+ success_condition = 'total_score=5000 LIMIT 1'
+ users_query = ('SELECT total_score FROM [%s:%s.%s] '
+ 'WHERE %s' % (self.project,
+ self.dataset.name,
+ self.OUTPUT_TABLE_USERS,
+ success_condition))
+ bq_users_verifier = BigqueryMatcher(self.project,
+ users_query,
+ self.DEFAULT_EXPECTED_CHECKSUM)
+
+ teams_query = ('SELECT total_score FROM [%s:%s.%s] '
+ 'WHERE %s' % (self.project,
+ self.dataset.name,
+ self.OUTPUT_TABLE_TEAMS,
+ success_condition))
+ bq_teams_verifier = BigqueryMatcher(self.project,
+ teams_query,
+ self.DEFAULT_EXPECTED_CHECKSUM)
+
+ extra_opts = {'subscription': self.input_sub.full_name,
+ 'dataset': self.dataset.name,
+ 'topic': self.input_topic.full_name,
+ 'team_window_duration': 1,
+ 'wait_until_finish_duration':
+ self.WAIT_UNTIL_FINISH_DURATION,
+ 'on_success_matcher': all_of(state_verifier,
+ bq_users_verifier,
+ bq_teams_verifier)}
+
+ # Register cleanup before pipeline execution.
+ self.addCleanup(self._cleanup_pubsub)
+ self.addCleanup(utils.delete_bq_table, self.project,
+ self.dataset.name, self.OUTPUT_TABLE_USERS)
+ self.addCleanup(utils.delete_bq_table, self.project,
+ self.dataset.name, self.OUTPUT_TABLE_TEAMS)
+
+ # Generate input data and inject to PubSub.
+ test_utils.wait_for_subscriptions_created([self.input_topic,
+ self.input_sub])
+ self._inject_pubsub_game_events(self.input_topic, self.DEFAULT_INPUT_COUNT)
+
+ # Get pipeline options from command argument: --test-pipeline-options,
+ # and start pipeline job by calling pipeline main function.
+ leader_board.run(
+ self.test_pipeline.get_full_options_as_args(**extra_opts))
+
+
+if __name__ == '__main__':
+ logging.getLogger().setLevel(logging.DEBUG)
+ unittest.main()
diff --git a/sdks/python/apache_beam/examples/user_score_it_test.py
b/sdks/python/apache_beam/examples/user_score_it_test.py
new file mode 100644
index 0000000..7cd59e9
--- /dev/null
+++ b/sdks/python/apache_beam/examples/user_score_it_test.py
@@ -0,0 +1,84 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""End-to-end test for the user score example.
+
+Code: beam/sdks/python/apache_beam/examples/complete/game/user_score.py
+Usage:
+
+ python setup.py nosetests --test-pipeline-options=" \
+ --runner=TestDataflowRunner \
+ --project=... \
+ --staging_location=gs://... \
+ --temp_location=gs://... \
+ --output=gs://... \
+ --sdk_location=... \
+
+"""
+
+import logging
+import unittest
+import uuid
+
+from hamcrest.core.core.allof import all_of
+from nose.plugins.attrib import attr
+
+from apache_beam.examples.complete.game import user_score
+from apache_beam.runners.runner import PipelineState
+from apache_beam.testing.pipeline_verifiers import FileChecksumMatcher
+from apache_beam.testing.pipeline_verifiers import PipelineStateMatcher
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.test_utils import delete_files
+
+
+class UserScoreIT(unittest.TestCase):
+
+ DEFAULT_INPUT_FILE = 'gs://dataflow-samples/game/gaming_data*'
+ DEFAULT_EXPECTED_CHECKSUM = '9f3bd81669607f0d98ec80ddd477f3277cfba0a2'
+
+ def setUp(self):
+ self.test_pipeline = TestPipeline(is_integration_test=True)
+ self.uuid = str(uuid.uuid4())
+
+ self.output = '/'.join([self.test_pipeline.get_option('output'),
+ self.uuid,
+ 'results'])
+
+ @attr('IT')
+ def test_user_score_it(self):
+
+ state_verifier = PipelineStateMatcher(PipelineState.DONE)
+ file_verifier = FileChecksumMatcher(self.output + '*-of-*',
+ self.DEFAULT_EXPECTED_CHECKSUM)
+
+ extra_opts = {'input': self.DEFAULT_INPUT_FILE,
+ 'output': self.output + '/user-score',
+ 'on_success_matcher': all_of(state_verifier,
+ file_verifier)}
+
+ # Register clean up before pipeline execution
+ self.addCleanup(delete_files, [self.output + '*'])
+
+ # Get pipeline options from command argument: --test-pipeline-options,
+ # and start pipeline job by calling pipeline main function.
+ user_score.run(
+ self.test_pipeline.get_full_options_as_args(**extra_opts))
+
+
+if __name__ == '__main__':
+ logging.getLogger().setLevel(logging.DEBUG)
+ unittest.main()
diff --git a/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
index d6f0e97..8241a22 100644
--- a/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
+++ b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
@@ -52,6 +52,13 @@ class BigqueryMatcher(BaseMatcher):
"""
def __init__(self, project, query, checksum):
+ """Initialize BigQueryMatcher object.
+ Args:
+ project: The name (string) of the project.
+ query: The query (string) to perform.
+ checksum: SHA-1 hash generated from a sorted list of lines
+ read from expected output.
+ """
if bigquery is None:
raise ImportError(
'Bigquery dependencies are not installed.')
--
To stop receiving notification emails like this one, please contact
[email protected].