This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new f9e0969 [BEAM-7814] Make BigqueryMatcher wait for results. (#9154)
f9e0969 is described below
commit f9e09693dbddd71c6c0b0b48c5c7d0ab3147d4bf
Author: Udi Meiri <[email protected]>
AuthorDate: Mon Jul 29 09:54:23 2019 -0700
[BEAM-7814] Make BigqueryMatcher wait for results. (#9154)
* [BEAM-7814] Make BigqueryMatcher wait for results.
Also merges BigqueryMatcher and BigqueryFullResultMatcher shared code.
* Fixup: bigquery_matcher_test fixes
---
.../apache_beam/io/gcp/tests/bigquery_matcher.py | 57 ++++++++--------------
.../io/gcp/tests/bigquery_matcher_test.py | 13 ++---
2 files changed, 27 insertions(+), 43 deletions(-)
diff --git a/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
index 3c0b4b3..2180b86 100644
--- a/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
+++ b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
@@ -78,29 +78,29 @@ class BigqueryMatcher(BaseMatcher):
self.project = project
self.query = query
self.expected_checksum = checksum
+ self.checksum = None
def _matches(self, _):
- logging.info('Start verify Bigquery data.')
- # Run query
- bigquery_client = bigquery.Client(project=self.project)
- response = self._query_with_retry(bigquery_client)
- logging.info('Read from given query (%s), total rows %d',
- self.query, len(response))
+ if self.checksum is None:
+ response = self._query_with_retry()
+ logging.info('Read from given query (%s), total rows %d',
+ self.query, len(response))
- # Compute checksum
self.checksum = compute_hash(response)
logging.info('Generate checksum: %s', self.checksum)
-
- # Verify result
return self.checksum == self.expected_checksum
@retry.with_exponential_backoff(
num_retries=MAX_RETRIES,
retry_filter=retry_on_http_and_value_error)
- def _query_with_retry(self, bigquery_client):
+ def _query_with_retry(self):
"""Run Bigquery query with retry if got error http response"""
+ logging.info('Attempting to perform query %s to BQ', self.query)
+ # Create client here since it throws an exception if pickled.
+ bigquery_client = bigquery.Client(self.project)
query_job = bigquery_client.query(self.query)
- return [row.values() for row in query_job]
+ rows = query_job.result(timeout=60)
+ return [row.values() for row in rows]
def describe_to(self, description):
description \
@@ -113,7 +113,7 @@ class BigqueryMatcher(BaseMatcher):
.append_text(self.checksum)
-class BigqueryFullResultMatcher(BaseMatcher):
+class BigqueryFullResultMatcher(BigqueryMatcher):
"""Matcher that verifies Bigquery data with given query.
Fetch Bigquery data with given query, compare to the expected data.
@@ -126,41 +126,24 @@ class BigqueryFullResultMatcher(BaseMatcher):
query: The query (string) to perform.
data: List of tuples with the expected data.
"""
- if bigquery is None:
- raise ImportError(
- 'Bigquery dependencies are not installed.')
- if not query or not isinstance(query, str):
- raise ValueError(
- 'Invalid argument: query. Please use non-empty string')
- self.project = project
- self.query = query
+ super(BigqueryFullResultMatcher, self).__init__(project, query,
+ 'unused_checksum')
self.expected_data = data
self.actual_data = None
def _matches(self, _):
if self.actual_data is None:
- bigquery_client = bigquery.Client(project=self.project)
- self.actual_data = self._get_query_result(bigquery_client)
+ self.actual_data = self._get_query_result()
+ logging.info('Result of query is: %r', self.actual_data)
- # Verify result
try:
equal_to(self.expected_data)(self.actual_data)
return True
except BeamAssertException:
return False
- def _get_query_result(self, bigquery_client):
- return self._query_with_retry(bigquery_client)
-
- @retry.with_exponential_backoff(
- num_retries=MAX_RETRIES,
- retry_filter=retry_on_http_and_value_error)
- def _query_with_retry(self, bigquery_client):
- """Run Bigquery query with retry if got error http response"""
- logging.info('Attempting to perform query %s to BQ', self.query)
- rows = bigquery_client.query(self.query).result(timeout=60)
- logging.info('Result of query is: %r', rows)
- return [row.values() for row in rows]
+ def _get_query_result(self):
+ return self._query_with_retry()
def describe_to(self, description):
description \
@@ -190,10 +173,10 @@ class
BigqueryFullResultStreamingMatcher(BigqueryFullResultMatcher):
project, query, data)
self.timeout = timeout
- def _get_query_result(self, bigquery_client):
+ def _get_query_result(self):
start_time = time.time()
while time.time() - start_time <= self.timeout:
- response = self._query_with_retry(bigquery_client)
+ response = self._query_with_retry()
if len(response) >= len(self.expected_data):
return response
logging.debug('Query result contains %d rows' % len(response))
diff --git a/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher_test.py
b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher_test.py
index 2a0db08..3e034d6 100644
--- a/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher_test.py
+++ b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher_test.py
@@ -56,7 +56,8 @@ class BigqueryMatcherTest(unittest.TestCase):
mock_query_result[1].values.return_value = None
mock_query_result[2].values.return_value = None
- mock_bigquery.return_value.query.return_value = mock_query_result
+ mock_bigquery.return_value.query.return_value.result.return_value = (
+ mock_query_result)
matcher = bq_verifier.BigqueryMatcher(
'mock_project',
@@ -121,18 +122,18 @@ class BigqueryTableMatcherTest(unittest.TestCase):
class BigqueryFullResultStreamingMatcher(unittest.TestCase):
def setUp(self):
- self.timeout = 5
+ self.timeout = 0.01
def test__get_query_result_timeout(self, mock__query_with_retry):
- mock__query_with_retry.side_effect = lambda _: []
+ mock__query_with_retry.side_effect = lambda: []
matcher = bq_verifier.BigqueryFullResultStreamingMatcher(
'some-project', 'some-query', [1, 2, 3], timeout=self.timeout)
if sys.version_info >= (3,):
- with self.assertRaises(TimeoutError): # noqa: F821
- matcher._get_query_result(None)
+ with self.assertRaises(TimeoutError): # noqa: F821
+ matcher._get_query_result()
else:
with self.assertRaises(RuntimeError):
- matcher._get_query_result(None)
+ matcher._get_query_result()
if __name__ == '__main__':