This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new f9e0969  [BEAM-7814] Make BigqueryMatcher wait for results. (#9154)
f9e0969 is described below

commit f9e09693dbddd71c6c0b0b48c5c7d0ab3147d4bf
Author: Udi Meiri <[email protected]>
AuthorDate: Mon Jul 29 09:54:23 2019 -0700

    [BEAM-7814] Make BigqueryMatcher wait for results. (#9154)
    
    * [BEAM-7814] Make BigqueryMatcher wait for results.
    
    Also merges BigqueryMatcher and BigqueryFullResultMatcher shared code.
    
    * Fixup: bigquery_matcher_test fixes
---
 .../apache_beam/io/gcp/tests/bigquery_matcher.py   | 57 ++++++++--------------
 .../io/gcp/tests/bigquery_matcher_test.py          | 13 ++---
 2 files changed, 27 insertions(+), 43 deletions(-)

diff --git a/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py 
b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
index 3c0b4b3..2180b86 100644
--- a/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
+++ b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
@@ -78,29 +78,29 @@ class BigqueryMatcher(BaseMatcher):
     self.project = project
     self.query = query
     self.expected_checksum = checksum
+    self.checksum = None
 
   def _matches(self, _):
-    logging.info('Start verify Bigquery data.')
-    # Run query
-    bigquery_client = bigquery.Client(project=self.project)
-    response = self._query_with_retry(bigquery_client)
-    logging.info('Read from given query (%s), total rows %d',
-                 self.query, len(response))
+    if self.checksum is None:
+      response = self._query_with_retry()
+      logging.info('Read from given query (%s), total rows %d',
+                   self.query, len(response))
 
-    # Compute checksum
     self.checksum = compute_hash(response)
     logging.info('Generate checksum: %s', self.checksum)
-
-    # Verify result
     return self.checksum == self.expected_checksum
 
   @retry.with_exponential_backoff(
       num_retries=MAX_RETRIES,
       retry_filter=retry_on_http_and_value_error)
-  def _query_with_retry(self, bigquery_client):
+  def _query_with_retry(self):
     """Run Bigquery query with retry if got error http response"""
+    logging.info('Attempting to perform query %s to BQ', self.query)
+    # Create client here since it throws an exception if pickled.
+    bigquery_client = bigquery.Client(self.project)
     query_job = bigquery_client.query(self.query)
-    return [row.values() for row in query_job]
+    rows = query_job.result(timeout=60)
+    return [row.values() for row in rows]
 
   def describe_to(self, description):
     description \
@@ -113,7 +113,7 @@ class BigqueryMatcher(BaseMatcher):
       .append_text(self.checksum)
 
 
-class BigqueryFullResultMatcher(BaseMatcher):
+class BigqueryFullResultMatcher(BigqueryMatcher):
   """Matcher that verifies Bigquery data with given query.
 
   Fetch Bigquery data with given query, compare to the expected data.
@@ -126,41 +126,24 @@ class BigqueryFullResultMatcher(BaseMatcher):
       query: The query (string) to perform.
       data: List of tuples with the expected data.
     """
-    if bigquery is None:
-      raise ImportError(
-          'Bigquery dependencies are not installed.')
-    if not query or not isinstance(query, str):
-      raise ValueError(
-          'Invalid argument: query. Please use non-empty string')
-    self.project = project
-    self.query = query
+    super(BigqueryFullResultMatcher, self).__init__(project, query,
+                                                    'unused_checksum')
     self.expected_data = data
     self.actual_data = None
 
   def _matches(self, _):
     if self.actual_data is None:
-      bigquery_client = bigquery.Client(project=self.project)
-      self.actual_data = self._get_query_result(bigquery_client)
+      self.actual_data = self._get_query_result()
+      logging.info('Result of query is: %r', self.actual_data)
 
-    # Verify result
     try:
       equal_to(self.expected_data)(self.actual_data)
       return True
     except BeamAssertException:
       return False
 
-  def _get_query_result(self, bigquery_client):
-    return self._query_with_retry(bigquery_client)
-
-  @retry.with_exponential_backoff(
-      num_retries=MAX_RETRIES,
-      retry_filter=retry_on_http_and_value_error)
-  def _query_with_retry(self, bigquery_client):
-    """Run Bigquery query with retry if got error http response"""
-    logging.info('Attempting to perform query %s to BQ', self.query)
-    rows = bigquery_client.query(self.query).result(timeout=60)
-    logging.info('Result of query is: %r', rows)
-    return [row.values() for row in rows]
+  def _get_query_result(self):
+    return self._query_with_retry()
 
   def describe_to(self, description):
     description \
@@ -190,10 +173,10 @@ class 
BigqueryFullResultStreamingMatcher(BigqueryFullResultMatcher):
         project, query, data)
     self.timeout = timeout
 
-  def _get_query_result(self, bigquery_client):
+  def _get_query_result(self):
     start_time = time.time()
     while time.time() - start_time <= self.timeout:
-      response = self._query_with_retry(bigquery_client)
+      response = self._query_with_retry()
       if len(response) >= len(self.expected_data):
         return response
       logging.debug('Query result contains %d rows' % len(response))
diff --git a/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher_test.py 
b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher_test.py
index 2a0db08..3e034d6 100644
--- a/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher_test.py
+++ b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher_test.py
@@ -56,7 +56,8 @@ class BigqueryMatcherTest(unittest.TestCase):
     mock_query_result[1].values.return_value = None
     mock_query_result[2].values.return_value = None
 
-    mock_bigquery.return_value.query.return_value = mock_query_result
+    mock_bigquery.return_value.query.return_value.result.return_value = (
+        mock_query_result)
 
     matcher = bq_verifier.BigqueryMatcher(
         'mock_project',
@@ -121,18 +122,18 @@ class BigqueryTableMatcherTest(unittest.TestCase):
 class BigqueryFullResultStreamingMatcher(unittest.TestCase):
 
   def setUp(self):
-    self.timeout = 5
+    self.timeout = 0.01
 
   def test__get_query_result_timeout(self, mock__query_with_retry):
-    mock__query_with_retry.side_effect = lambda _: []
+    mock__query_with_retry.side_effect = lambda: []
     matcher = bq_verifier.BigqueryFullResultStreamingMatcher(
         'some-project', 'some-query', [1, 2, 3], timeout=self.timeout)
     if sys.version_info >= (3,):
-      with self.assertRaises(TimeoutError): # noqa: F821
-        matcher._get_query_result(None)
+      with self.assertRaises(TimeoutError):  # noqa: F821
+        matcher._get_query_result()
     else:
       with self.assertRaises(RuntimeError):
-        matcher._get_query_result(None)
+        matcher._get_query_result()
 
 
 if __name__ == '__main__':

Reply via email to