Repository: beam Updated Branches: refs/heads/master b322a5d40 -> de9e8528c
[BEAM-1188] Python Bigquery Verifier For E2E Test Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/dd32c266 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/dd32c266 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/dd32c266 Branch: refs/heads/master Commit: dd32c26622ef9d0aa9e8d0c3863ac6660ed336b7 Parents: b322a5d Author: Mark Liu <[email protected]> Authored: Tue Feb 21 18:48:34 2017 -0800 Committer: Ahmet Altay <[email protected]> Committed: Thu Mar 2 15:05:27 2017 -0800 ---------------------------------------------------------------------- .../cookbook/bigquery_tornadoes_it_test.py | 62 +++++++++++ .../python/apache_beam/io/gcp/tests/__init__.py | 16 +++ .../io/gcp/tests/bigquery_matcher.py | 108 +++++++++++++++++++ .../io/gcp/tests/bigquery_matcher_test.py | 108 +++++++++++++++++++ .../apache_beam/tests/pipeline_verifiers.py | 12 +-- sdks/python/apache_beam/tests/test_utils.py | 12 +++ sdks/python/setup.py | 3 + 7 files changed, 313 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/dd32c266/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_it_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_it_test.py b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_it_test.py new file mode 100644 index 0000000..306a09e --- /dev/null +++ b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_it_test.py @@ -0,0 +1,62 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""End-to-end test for Bigquery tornadoes example.""" + +import logging +import time +import unittest + +from hamcrest.core.core.allof import all_of +from nose.plugins.attrib import attr + +from apache_beam.examples.cookbook import bigquery_tornadoes +from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryMatcher +from apache_beam.test_pipeline import TestPipeline +from apache_beam.tests.pipeline_verifiers import PipelineStateMatcher + + +class BigqueryTornadoesIT(unittest.TestCase): + + # The default checksum is a SHA-1 hash generated from sorted rows reading + # from expected Bigquery table. + DEFAULT_CHECKSUM = '83789a7c1bca7959dcf23d3bc37e9204e594330f' + + @attr('IT') + def test_bigquery_tornadoes_it(self): + test_pipeline = TestPipeline(is_integration_test=True) + + # Set extra options to the pipeline for test purpose + output_table = ('BigQueryTornadoesIT' + '.monthly_tornadoes_%s' % int(round(time.time() * 1000))) + query = 'SELECT month, tornado_count FROM [%s]' % output_table + pipeline_verifiers = [PipelineStateMatcher(), + BigqueryMatcher( + project=test_pipeline.get_option('project'), + query=query, + checksum=self.DEFAULT_CHECKSUM)] + extra_opts = {'output': output_table, + 'on_success_matcher': all_of(*pipeline_verifiers)} + + # Get pipeline options from command argument: --test-pipeline-options, + # and start pipeline job by calling pipeline main function. + bigquery_tornadoes.run( + test_pipeline.get_full_options_as_args(**extra_opts)) + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + unittest.main() http://git-wip-us.apache.org/repos/asf/beam/blob/dd32c266/sdks/python/apache_beam/io/gcp/tests/__init__.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/gcp/tests/__init__.py b/sdks/python/apache_beam/io/gcp/tests/__init__.py new file mode 100644 index 0000000..cce3aca --- /dev/null +++ b/sdks/python/apache_beam/io/gcp/tests/__init__.py @@ -0,0 +1,16 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# http://git-wip-us.apache.org/repos/asf/beam/blob/dd32c266/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py new file mode 100644 index 0000000..cc26689 --- /dev/null +++ b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py @@ -0,0 +1,108 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Bigquery data verifier for end-to-end test.""" + +import logging + +from hamcrest.core.base_matcher import BaseMatcher + +from apache_beam.tests.test_utils import compute_hash +from apache_beam.utils import retry + +# Protect against environments where bigquery library is not available. +# pylint: disable=wrong-import-order, wrong-import-position +try: + from google.cloud import bigquery + from google.cloud.exceptions import GoogleCloudError +except ImportError: + bigquery = None +# pylint: enable=wrong-import-order, wrong-import-position + +MAX_RETRIES = 4 + + +def retry_on_http_and_value_error(exception): + """Filter allowing retries on Bigquery errors and value error.""" + return isinstance(exception, GoogleCloudError) or \ + isinstance(exception, ValueError) + + +class BigqueryMatcher(BaseMatcher): + """Matcher that verifies Bigquery data with given query. + + Fetch Bigquery data with given query, compute a hash string and compare + with expected checksum. + """ + + def __init__(self, project, query, checksum): + if bigquery is None: + raise ImportError( + 'Bigquery dependencies are not installed.') + if not query or not isinstance(query, str): + raise ValueError( + 'Invalid argument: query. Please use non-empty string') + if not checksum or not isinstance(checksum, str): + raise ValueError( + 'Invalid argument: checksum. Please use non-empty string') + self.project = project + self.query = query + self.expected_checksum = checksum + + def _matches(self, _): + logging.info('Start verify Bigquery data.') + # Run query + bigquery_client = bigquery.Client(project=self.project) + response = self._query_with_retry(bigquery_client) + logging.info('Read from given query (%s), total rows %d', + self.query, len(response)) + + # Compute checksum + self.checksum = compute_hash(response) + logging.info('Generate checksum: %s', self.checksum) + + # Verify result + return self.checksum == self.expected_checksum + + @retry.with_exponential_backoff( + num_retries=MAX_RETRIES, + retry_filter=retry_on_http_and_value_error) + def _query_with_retry(self, bigquery_client): + """Run Bigquery query with retry if got error http response""" + query = bigquery_client.run_sync_query(self.query) + query.run() + + # Fetch query data one page at a time. + page_token = None + results = [] + while True: + rows, _, page_token = query.fetch_data(page_token=page_token) + results.extend(rows) + if not page_token: + break + + return results + + def describe_to(self, description): + description \ + .append_text("Expected checksum is ") \ + .append_text(self.expected_checksum) + + def describe_mismatch(self, pipeline_result, mismatch_description): + mismatch_description \ + .append_text("Actual checksum is ") \ + .append_text(self.checksum) http://git-wip-us.apache.org/repos/asf/beam/blob/dd32c266/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher_test.py b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher_test.py new file mode 100644 index 0000000..d8aa148 --- /dev/null +++ b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher_test.py @@ -0,0 +1,108 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Unit test for Bigquery verifier""" + +import logging +import unittest + +from hamcrest import assert_that as hc_assert_that +from mock import Mock, patch + +from apache_beam.io.gcp.tests import bigquery_matcher as bq_verifier +from apache_beam.tests.test_utils import patch_retry + +# Protect against environments where bigquery library is not available. +# pylint: disable=wrong-import-order, wrong-import-position +try: + from google.cloud import bigquery + from google.cloud.exceptions import NotFound +except ImportError: + bigquery = None +# pylint: enable=wrong-import-order, wrong-import-position + + [email protected](bigquery is None, 'Bigquery dependencies are not installed.') +class BigqueryMatcherTest(unittest.TestCase): + + def setUp(self): + self._mock_result = Mock() + patch_retry(self, bq_verifier) + + @patch.object(bigquery, 'Client') + def test_bigquery_matcher_success(self, mock_bigquery): + mock_query = Mock() + mock_client = mock_bigquery.return_value + mock_client.run_sync_query.return_value = mock_query + mock_query.fetch_data.return_value = ([], None, None) + + matcher = bq_verifier.BigqueryMatcher( + 'mock_project', + 'mock_query', + 'da39a3ee5e6b4b0d3255bfef95601890afd80709') + hc_assert_that(self._mock_result, matcher) + + @patch.object(bigquery, 'Client') + def test_bigquery_matcher_query_run_error(self, mock_bigquery): + mock_query = Mock() + mock_client = mock_bigquery.return_value + mock_client.run_sync_query.return_value = mock_query + mock_query.run.side_effect = ValueError('job is already running') + + matcher = bq_verifier.BigqueryMatcher('mock_project', + 'mock_query', + 'mock_checksum') + with self.assertRaises(ValueError): + hc_assert_that(self._mock_result, matcher) + self.assertTrue(mock_query.run.called) + self.assertEqual(bq_verifier.MAX_RETRIES + 1, mock_query.run.call_count) + + @patch.object(bigquery, 'Client') + def test_bigquery_matcher_fetch_data_error(self, mock_bigquery): + mock_query = Mock() + mock_client = mock_bigquery.return_value + mock_client.run_sync_query.return_value = mock_query + mock_query.fetch_data.side_effect = ValueError('query job not executed') + + matcher = bq_verifier.BigqueryMatcher('mock_project', + 'mock_query', + 'mock_checksum') + with self.assertRaises(ValueError): + hc_assert_that(self._mock_result, matcher) + self.assertTrue(mock_query.fetch_data.called) + self.assertEqual(bq_verifier.MAX_RETRIES + 1, + mock_query.fetch_data.call_count) + + @patch.object(bigquery, 'Client') + def test_bigquery_matcher_query_responds_error_code(self, mock_bigquery): + mock_query = Mock() + mock_client = mock_bigquery.return_value + mock_client.run_sync_query.return_value = mock_query + mock_query.run.side_effect = NotFound('table is not found') + + matcher = bq_verifier.BigqueryMatcher('mock_project', + 'mock_query', + 'mock_checksum') + with self.assertRaises(NotFound): + hc_assert_that(self._mock_result, matcher) + self.assertTrue(mock_query.run.called) + self.assertEqual(bq_verifier.MAX_RETRIES + 1, mock_query.run.call_count) + + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + unittest.main() http://git-wip-us.apache.org/repos/asf/beam/blob/dd32c266/sdks/python/apache_beam/tests/pipeline_verifiers.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/tests/pipeline_verifiers.py b/sdks/python/apache_beam/tests/pipeline_verifiers.py index 41dfc07..379a96f 100644 --- a/sdks/python/apache_beam/tests/pipeline_verifiers.py +++ b/sdks/python/apache_beam/tests/pipeline_verifiers.py @@ -22,13 +22,13 @@ of test pipeline job. Customized verifier should extend `hamcrest.core.base_matcher.BaseMatcher` and override _matches. """ -import hashlib import logging from hamcrest.core.base_matcher import BaseMatcher from apache_beam.io.fileio import ChannelFactory from apache_beam.runners.runner import PipelineState +from apache_beam.tests import test_utils as utils from apache_beam.utils import retry try: @@ -76,7 +76,7 @@ class FileChecksumMatcher(BaseMatcher): """Matcher that verifies file(s) content by comparing file checksum. Use apache_beam.io.fileio to fetch file(s) from given path. File checksum - is a SHA-1 hash computed from content of file(s). + is a hash string computed from content of file(s). """ def __init__(self, file_path, expected_checksum): @@ -103,13 +103,9 @@ class FileChecksumMatcher(BaseMatcher): read_lines = self._read_with_retry() # Compute checksum - read_lines.sort() - m = hashlib.new('sha1') - for line in read_lines: - m.update(line) - self.checksum, num_lines = (m.hexdigest(), len(read_lines)) + self.checksum = utils.compute_hash(read_lines) logging.info('Read from given path %s, %d lines, checksum: %s.', - self.file_path, num_lines, self.checksum) + self.file_path, len(read_lines), self.checksum) return self.checksum == self.expected_checksum def describe_to(self, description): http://git-wip-us.apache.org/repos/asf/beam/blob/dd32c266/sdks/python/apache_beam/tests/test_utils.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/tests/test_utils.py b/sdks/python/apache_beam/tests/test_utils.py index 3fdfe88..666207e 100644 --- a/sdks/python/apache_beam/tests/test_utils.py +++ b/sdks/python/apache_beam/tests/test_utils.py @@ -17,11 +17,23 @@ """Utility methods for testing""" +import hashlib import imp from mock import Mock, patch from apache_beam.utils import retry +DEFAULT_HASHING_ALG = 'sha1' + + +def compute_hash(content, hashing_alg=DEFAULT_HASHING_ALG): + """Compute a hash value from a list of string.""" + content.sort() + m = hashlib.new(hashing_alg) + for elem in content: + m.update(str(elem)) + return m.hexdigest() + def patch_retry(testcase, module): """A function to patch retry module to use mock clock and logger. http://git-wip-us.apache.org/repos/asf/beam/blob/dd32c266/sdks/python/setup.py ---------------------------------------------------------------------- diff --git a/sdks/python/setup.py b/sdks/python/setup.py index 022d69d..cf210d9 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -87,6 +87,7 @@ REQUIRED_PACKAGES = [ 'avro>=1.7.7,<2.0.0', 'crcmod>=1.7,<2.0', 'dill==0.2.6', + 'google-cloud-bigquery>=0.22.1,<1.0.0', 'httplib2>=0.8,<0.10', 'mock>=1.0.1,<3.0.0', 'oauth2client>=2.0.1,<4.0.0', @@ -102,6 +103,8 @@ GCP_REQUIREMENTS = [ 'google-apitools>=0.5.6,<1.0.0', 'proto-google-cloud-datastore-v1==0.90.0', 'googledatastore==7.0.0', + # GCP packages required by tests + 'google-cloud-bigquery>=0.22.1,<0.23', ]
