Repository: beam Updated Branches: refs/heads/master 72bd73ab3 -> f731de0b3
[BEAM-1584] Add clean up in bigquery integration test Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fc00ce90 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fc00ce90 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fc00ce90 Branch: refs/heads/master Commit: fc00ce90060ccd99b69ff7225e3e35d259d3c7ad Parents: 72bd73a Author: Mark Liu <[email protected]> Authored: Tue Aug 1 17:46:05 2017 -0700 Committer: Ahmet Altay <[email protected]> Committed: Fri Aug 4 09:13:49 2017 -0700 ---------------------------------------------------------------------- .../cookbook/bigquery_tornadoes_it_test.py | 14 +++- sdks/python/apache_beam/io/gcp/tests/utils.py | 63 ++++++++++++++++++ .../apache_beam/io/gcp/tests/utils_test.py | 70 ++++++++++++++++++++ 3 files changed, 144 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/fc00ce90/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_it_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_it_test.py b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_it_test.py index 5d2ee7c..05ee3c5 100644 --- a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_it_test.py +++ b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_it_test.py @@ -26,6 +26,7 @@ from nose.plugins.attrib import attr from apache_beam.examples.cookbook import bigquery_tornadoes from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryMatcher +from apache_beam.io.gcp.tests import utils from apache_beam.testing.pipeline_verifiers import PipelineStateMatcher from apache_beam.testing.test_pipeline import TestPipeline @@ -44,17 +45,24 @@ class BigqueryTornadoesIT(unittest.TestCase): test_pipeline = TestPipeline(is_integration_test=True) # Set extra options to the pipeline for test purpose - output_table = ('BigQueryTornadoesIT' - '.monthly_tornadoes_%s' % int(round(time.time() * 1000))) + project = test_pipeline.get_option('project') + + dataset = 'BigQueryTornadoesIT' + table = 'monthly_tornadoes_%s' % int(round(time.time() * 1000)) + output_table = '.'.join([dataset, table]) query = 'SELECT month, tornado_count FROM [%s]' % output_table + pipeline_verifiers = [PipelineStateMatcher(), BigqueryMatcher( - project=test_pipeline.get_option('project'), + project=project, query=query, checksum=self.DEFAULT_CHECKSUM)] extra_opts = {'output': output_table, 'on_success_matcher': all_of(*pipeline_verifiers)} + # Register cleanup before pipeline execution. + self.addCleanup(utils.delete_bq_table, project, dataset, table) + # Get pipeline options from command argument: --test-pipeline-options, # and start pipeline job by calling pipeline main function. bigquery_tornadoes.run( http://git-wip-us.apache.org/repos/asf/beam/blob/fc00ce90/sdks/python/apache_beam/io/gcp/tests/utils.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/gcp/tests/utils.py b/sdks/python/apache_beam/io/gcp/tests/utils.py new file mode 100644 index 0000000..40eb975 --- /dev/null +++ b/sdks/python/apache_beam/io/gcp/tests/utils.py @@ -0,0 +1,63 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +"""Utility methods for testing on GCP.""" + +import logging + +from apache_beam.utils import retry + +# Protect against environments where bigquery library is not available. +try: + from google.cloud import bigquery +except ImportError: + bigquery = None + + +class GcpTestIOError(retry.PermanentException): + """Basic GCP IO error for testing. Function that raises this error should + not be retried.""" + pass + + [email protected]_exponential_backoff( + num_retries=3, + retry_filter=retry.retry_on_server_errors_filter) +def delete_bq_table(project, dataset, table): + """Delete a Biqquery table. + + Args: + project: Name of the project. + dataset: Name of the dataset where table is. + table: Name of the table. + """ + logging.info('Clean up a Bigquery table with project: %s, dataset: %s, ' + 'table: %s.', project, dataset, table) + bq_dataset = bigquery.Client(project=project).dataset(dataset) + if not bq_dataset.exists(): + raise GcpTestIOError('Failed to cleanup. Bigquery dataset %s doesn\'t' + 'exist in project %s.' % dataset, project) + bq_table = bq_dataset.table(table) + if not bq_table.exists(): + raise GcpTestIOError('Failed to cleanup. Biqeury table %s doesn\'t ' + 'exist in project %s, dataset %s.' % + table, project, dataset) + bq_table.delete() + if bq_table.exists(): + raise RuntimeError('Failed to cleanup. Bigquery table %s still exists ' + 'after cleanup.' % table) http://git-wip-us.apache.org/repos/asf/beam/blob/fc00ce90/sdks/python/apache_beam/io/gcp/tests/utils_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/gcp/tests/utils_test.py b/sdks/python/apache_beam/io/gcp/tests/utils_test.py new file mode 100644 index 0000000..270750a --- /dev/null +++ b/sdks/python/apache_beam/io/gcp/tests/utils_test.py @@ -0,0 +1,70 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Unittest for GCP testing utils.""" + +import logging +import unittest +from mock import Mock, patch + +from apache_beam.io.gcp.tests import utils +from apache_beam.testing.test_utils import patch_retry + +# Protect against environments where bigquery library is not available. +try: + from google.cloud import bigquery +except ImportError: + bigquery = None + + [email protected](bigquery is None, 'Bigquery dependencies are not installed.') +class UtilsTest(unittest.TestCase): + + def setUp(self): + self._mock_result = Mock() + patch_retry(self, utils) + + @patch('google.cloud.bigquery.Table.delete') + @patch('google.cloud.bigquery.Table.exists', side_effect=[True, False]) + @patch('google.cloud.bigquery.Dataset.exists', return_value=True) + def test_delete_bq_table_succeeds(self, *_): + utils.delete_bq_table('unused_project', + 'unused_dataset', + 'unused_table') + + @patch('google.cloud.bigquery.Table.delete', side_effect=Exception) + @patch('google.cloud.bigquery.Table.exists', return_value=True) + @patch('google.cloud.bigquery.Dataset.exists', return_vaue=True) + def test_delete_bq_table_fails_with_server_error(self, *_): + with self.assertRaises(Exception): + utils.delete_bq_table('unused_project', + 'unused_dataset', + 'unused_table') + + @patch('google.cloud.bigquery.Table.delete') + @patch('google.cloud.bigquery.Table.exists', return_value=[True, True]) + @patch('google.cloud.bigquery.Dataset.exists', return_vaue=True) + def test_delete_bq_table_fails_with_delete_error(self, *_): + with self.assertRaises(RuntimeError): + utils.delete_bq_table('unused_project', + 'unused_dataset', + 'unused_table') + + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + unittest.main()
