This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 5e0f81a  [BEAM-4333] Add integration tests for python mobile games 
(#5630)
5e0f81a is described below

commit 5e0f81a9b4bceb5e92aa728ef1997863aeff925f
Author: mariapython <[email protected]>
AuthorDate: Thu Jun 14 10:41:57 2018 -0700

    [BEAM-4333] Add integration tests for python mobile games (#5630)
    
    * Add integration tests for python mobile games
---
 .../examples/complete/game/leader_board.py         |  15 +-
 .../examples/hourly_team_score_it_test.py          |  96 +++++++++++++
 .../apache_beam/examples/leader_board_it_test.py   | 158 +++++++++++++++++++++
 .../apache_beam/examples/user_score_it_test.py     |  84 +++++++++++
 .../apache_beam/io/gcp/tests/bigquery_matcher.py   |   7 +
 5 files changed, 358 insertions(+), 2 deletions(-)

diff --git a/sdks/python/apache_beam/examples/complete/game/leader_board.py 
b/sdks/python/apache_beam/examples/complete/game/leader_board.py
index 99a8e09..27ef16d 100644
--- a/sdks/python/apache_beam/examples/complete/game/leader_board.py
+++ b/sdks/python/apache_beam/examples/complete/game/leader_board.py
@@ -257,6 +257,9 @@ def run(argv=None):
                       type=str,
                       required=True,
                       help='Pub/Sub topic to read from')
+  parser.add_argument('--subscription',
+                      type=str,
+                      help='Pub/Sub subscription to read from')
   parser.add_argument('--dataset',
                       type=str,
                       required=True,
@@ -295,9 +298,17 @@ def run(argv=None):
   with beam.Pipeline(options=options) as p:
     # Read game events from Pub/Sub using custom timestamps, which are 
extracted
     # from the pubsub data elements, and parse the data.
+
+    # Read from PubSub into a PCollection.
+    if args.subscription:
+      scores = p | 'ReadPubSub' >> beam.io.ReadStringsFromPubSub(
+          subscription=args.subscription)
+    else:
+      scores = p | 'ReadPubSub' >> beam.io.ReadStringsFromPubSub(
+          topic=args.topic)
+
     events = (
-        p
-        | 'ReadPubSub' >> beam.io.gcp.pubsub.ReadStringsFromPubSub(args.topic)
+        scores
         | 'ParseGameEventFn' >> beam.ParDo(ParseGameEventFn())
         | 'AddEventTimestamps' >> beam.Map(
             lambda elem: beam.window.TimestampedValue(elem, 
elem['timestamp'])))
diff --git a/sdks/python/apache_beam/examples/hourly_team_score_it_test.py 
b/sdks/python/apache_beam/examples/hourly_team_score_it_test.py
new file mode 100644
index 0000000..ffea48e
--- /dev/null
+++ b/sdks/python/apache_beam/examples/hourly_team_score_it_test.py
@@ -0,0 +1,96 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""End-to-end test for the hourly team score example.
+
+Code: beam/sdks/python/apache_beam/examples/complete/game/hourly_team_score.py
+Usage:
+
+  python setup.py nosetests --test-pipeline-options=" \
+      --runner=TestDataflowRunner \
+      --project=... \
+      --staging_location=gs://... \
+      --temp_location=gs://... \
+      --output=gs://... \
+      --sdk_location=... \
+
+"""
+
+import logging
+import time
+import unittest
+
+from hamcrest.core.core.allof import all_of
+from nose.plugins.attrib import attr
+
+from apache_beam.examples.complete.game import hourly_team_score
+from apache_beam.io.gcp.tests import utils
+from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryMatcher
+from apache_beam.runners.runner import PipelineState
+from apache_beam.testing.pipeline_verifiers import PipelineStateMatcher
+from apache_beam.testing.test_pipeline import TestPipeline
+
+
+class HourlyTeamScoreIT(unittest.TestCase):
+
+  DEFAULT_INPUT_FILE = 'gs://dataflow-samples/game/gaming_data*'
+  # SHA-1 hash generated from sorted rows reading from BigQuery table
+  DEFAULT_EXPECTED_CHECKSUM = '4fa761fb5c3341ec573d5d12c6ab75e3b2957a25'
+  OUTPUT_DATASET = 'hourly_team_score_it_dataset'
+  OUTPUT_TABLE = 'leader_board'
+
+  def setUp(self):
+    self.test_pipeline = TestPipeline(is_integration_test=True)
+    self.project = self.test_pipeline.get_option('project')
+
+    # Set up BigQuery environment
+    from google.cloud import bigquery
+    client = bigquery.Client()
+    unique_dataset_name = self.OUTPUT_DATASET + str(int(time.time()))
+    self.dataset = client.dataset(unique_dataset_name, project=self.project)
+    self.dataset.create()
+
+  @attr('IT')
+  def test_hourly_team_score_it(self):
+    state_verifier = PipelineStateMatcher(PipelineState.DONE)
+    query = ('SELECT COUNT(*) FROM [%s:%s.%s]' % (self.project,
+                                                  self.dataset.name,
+                                                  self.OUTPUT_TABLE))
+
+    bigquery_verifier = BigqueryMatcher(self.project,
+                                        query,
+                                        self.DEFAULT_EXPECTED_CHECKSUM)
+
+    extra_opts = {'input': self.DEFAULT_INPUT_FILE,
+                  'dataset': self.dataset.name,
+                  'window_duration': 1,
+                  'on_success_matcher': all_of(state_verifier,
+                                               bigquery_verifier)}
+
+    # Register clean up before pipeline execution
+    self.addCleanup(utils.delete_bq_table, self.project,
+                    self.dataset.name, self.OUTPUT_TABLE)
+
+    # Get pipeline options from command argument: --test-pipeline-options,
+    # and start pipeline job by calling pipeline main function.
+    hourly_team_score.run(
+        self.test_pipeline.get_full_options_as_args(**extra_opts))
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.DEBUG)
+  unittest.main()
diff --git a/sdks/python/apache_beam/examples/leader_board_it_test.py 
b/sdks/python/apache_beam/examples/leader_board_it_test.py
new file mode 100644
index 0000000..12c3972
--- /dev/null
+++ b/sdks/python/apache_beam/examples/leader_board_it_test.py
@@ -0,0 +1,158 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""End-to-end test for the leader board example.
+
+Code: beam/sdks/python/apache_beam/examples/complete/game/leader_board.py
+Usage:
+
+  python setup.py nosetests --test-pipeline-options=" \
+      --runner=TestDataflowRunner \
+      --project=... \
+      --staging_location=gs://... \
+      --temp_location=gs://... \
+      --output=gs://... \
+      --sdk_location=... \
+
+"""
+
+import logging
+import time
+import unittest
+import uuid
+
+from hamcrest.core.core.allof import all_of
+from nose.plugins.attrib import attr
+
+from apache_beam.examples.complete.game import leader_board
+from apache_beam.io.gcp.tests import utils
+from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryMatcher
+from apache_beam.runners.runner import PipelineState
+from apache_beam.testing import test_utils
+from apache_beam.testing.pipeline_verifiers import PipelineStateMatcher
+from apache_beam.testing.test_pipeline import TestPipeline
+
+
+class LeaderBoardIT(unittest.TestCase):
+
+  # Input event containing user, team, score, processing time, window start.
+  INPUT_EVENT = 'user1,teamA,10,%d,2015-11-02 09:09:28.224'
+  INPUT_TOPIC = 'leader_board_it_input_topic'
+  INPUT_SUB = 'leader_board_it_input_subscription'
+
+  # SHA-1 hash generated from sorted rows reading from BigQuery table
+  DEFAULT_EXPECTED_CHECKSUM = 'de00231fe6730b972c0ff60a99988438911cda53'
+  OUTPUT_DATASET = 'leader_board_it_dataset'
+  OUTPUT_TABLE_USERS = 'leader_board_users'
+  OUTPUT_TABLE_TEAMS = 'leader_board_teams'
+  DEFAULT_INPUT_COUNT = 500
+
+  WAIT_UNTIL_FINISH_DURATION = 10 * 60 * 1000   # in milliseconds
+
+  def setUp(self):
+    self.test_pipeline = TestPipeline(is_integration_test=True)
+    self.project = self.test_pipeline.get_option('project')
+    _unique_id = str(uuid.uuid4())
+
+    # Set up PubSub environment.
+    from google.cloud import pubsub
+    self.pubsub_client = pubsub.Client(project=self.project)
+    unique_topic_name = self.INPUT_TOPIC + _unique_id
+    unique_subscrition_name = self.INPUT_SUB + _unique_id
+    self.input_topic = self.pubsub_client.topic(unique_topic_name)
+    self.input_sub = self.input_topic.subscription(unique_subscrition_name)
+
+    self.input_topic.create()
+    test_utils.wait_for_topics_created([self.input_topic])
+    self.input_sub.create()
+
+    # Set up BigQuery environment
+    from google.cloud import bigquery
+    client = bigquery.Client()
+    unique_dataset_name = self.OUTPUT_DATASET + str(int(time.time()))
+    self.dataset = client.dataset(unique_dataset_name, project=self.project)
+    self.dataset.create()
+
+    self._test_timestamp = int(time.time() * 1000)
+
+  def _inject_pubsub_game_events(self, topic, message_count):
+    """Inject game events as test data to PubSub."""
+
+    logging.debug('Injecting %d game events to topic %s',
+                  message_count, topic.full_name)
+
+    for _ in xrange(message_count):
+      topic.publish(self.INPUT_EVENT % self._test_timestamp)
+
+  def _cleanup_pubsub(self):
+    test_utils.cleanup_subscriptions([self.input_sub])
+    test_utils.cleanup_topics([self.input_topic])
+
+  @attr('IT')
+  def test_leader_board_it(self):
+    state_verifier = PipelineStateMatcher(PipelineState.RUNNING)
+
+    success_condition = 'total_score=5000 LIMIT 1'
+    users_query = ('SELECT total_score FROM [%s:%s.%s] '
+                   'WHERE %s' % (self.project,
+                                 self.dataset.name,
+                                 self.OUTPUT_TABLE_USERS,
+                                 success_condition))
+    bq_users_verifier = BigqueryMatcher(self.project,
+                                        users_query,
+                                        self.DEFAULT_EXPECTED_CHECKSUM)
+
+    teams_query = ('SELECT total_score FROM [%s:%s.%s] '
+                   'WHERE %s' % (self.project,
+                                 self.dataset.name,
+                                 self.OUTPUT_TABLE_TEAMS,
+                                 success_condition))
+    bq_teams_verifier = BigqueryMatcher(self.project,
+                                        teams_query,
+                                        self.DEFAULT_EXPECTED_CHECKSUM)
+
+    extra_opts = {'subscription': self.input_sub.full_name,
+                  'dataset': self.dataset.name,
+                  'topic': self.input_topic.full_name,
+                  'team_window_duration': 1,
+                  'wait_until_finish_duration':
+                      self.WAIT_UNTIL_FINISH_DURATION,
+                  'on_success_matcher': all_of(state_verifier,
+                                               bq_users_verifier,
+                                               bq_teams_verifier)}
+
+    # Register cleanup before pipeline execution.
+    self.addCleanup(self._cleanup_pubsub)
+    self.addCleanup(utils.delete_bq_table, self.project,
+                    self.dataset.name, self.OUTPUT_TABLE_USERS)
+    self.addCleanup(utils.delete_bq_table, self.project,
+                    self.dataset.name, self.OUTPUT_TABLE_TEAMS)
+
+    # Generate input data and inject to PubSub.
+    test_utils.wait_for_subscriptions_created([self.input_topic,
+                                               self.input_sub])
+    self._inject_pubsub_game_events(self.input_topic, self.DEFAULT_INPUT_COUNT)
+
+    # Get pipeline options from command argument: --test-pipeline-options,
+    # and start pipeline job by calling pipeline main function.
+    leader_board.run(
+        self.test_pipeline.get_full_options_as_args(**extra_opts))
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.DEBUG)
+  unittest.main()
diff --git a/sdks/python/apache_beam/examples/user_score_it_test.py 
b/sdks/python/apache_beam/examples/user_score_it_test.py
new file mode 100644
index 0000000..7cd59e9
--- /dev/null
+++ b/sdks/python/apache_beam/examples/user_score_it_test.py
@@ -0,0 +1,84 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""End-to-end test for the user score example.
+
+Code: beam/sdks/python/apache_beam/examples/complete/game/user_score.py
+Usage:
+
+  python setup.py nosetests --test-pipeline-options=" \
+      --runner=TestDataflowRunner \
+      --project=... \
+      --staging_location=gs://... \
+      --temp_location=gs://... \
+      --output=gs://... \
+      --sdk_location=... \
+
+"""
+
+import logging
+import unittest
+import uuid
+
+from hamcrest.core.core.allof import all_of
+from nose.plugins.attrib import attr
+
+from apache_beam.examples.complete.game import user_score
+from apache_beam.runners.runner import PipelineState
+from apache_beam.testing.pipeline_verifiers import FileChecksumMatcher
+from apache_beam.testing.pipeline_verifiers import PipelineStateMatcher
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.test_utils import delete_files
+
+
+class UserScoreIT(unittest.TestCase):
+
+  DEFAULT_INPUT_FILE = 'gs://dataflow-samples/game/gaming_data*'
+  DEFAULT_EXPECTED_CHECKSUM = '9f3bd81669607f0d98ec80ddd477f3277cfba0a2'
+
+  def setUp(self):
+    self.test_pipeline = TestPipeline(is_integration_test=True)
+    self.uuid = str(uuid.uuid4())
+
+    self.output = '/'.join([self.test_pipeline.get_option('output'),
+                            self.uuid,
+                            'results'])
+
+  @attr('IT')
+  def test_user_score_it(self):
+
+    state_verifier = PipelineStateMatcher(PipelineState.DONE)
+    file_verifier = FileChecksumMatcher(self.output + '*-of-*',
+                                        self.DEFAULT_EXPECTED_CHECKSUM)
+
+    extra_opts = {'input': self.DEFAULT_INPUT_FILE,
+                  'output': self.output + '/user-score',
+                  'on_success_matcher': all_of(state_verifier,
+                                               file_verifier)}
+
+    # Register clean up before pipeline execution
+    self.addCleanup(delete_files, [self.output + '*'])
+
+    # Get pipeline options from command argument: --test-pipeline-options,
+    # and start pipeline job by calling pipeline main function.
+    user_score.run(
+        self.test_pipeline.get_full_options_as_args(**extra_opts))
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.DEBUG)
+  unittest.main()
diff --git a/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py 
b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
index d6f0e97..8241a22 100644
--- a/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
+++ b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
@@ -52,6 +52,13 @@ class BigqueryMatcher(BaseMatcher):
   """
 
   def __init__(self, project, query, checksum):
+    """Initialize BigQueryMatcher object.
+    Args:
+      project: The name (string) of the project.
+      query: The query (string) to perform.
+      checksum: SHA-1 hash generated from a sorted list of lines
+        read from expected output.
+    """
     if bigquery is None:
       raise ImportError(
           'Bigquery dependencies are not installed.')

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to