This is an automated email from the ASF dual-hosted git repository.

udim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new fe6c2c7  [BEAM-7463] Fix BQ IT flake with streaming inserts (#12951)
fe6c2c7 is described below

commit fe6c2c7b49a22f71526d2f949e9ad483eb898be3
Author: Udi Meiri <[email protected]>
AuthorDate: Mon Oct 5 10:25:15 2020 -0700

    [BEAM-7463] Fix BQ IT flake with streaming inserts (#12951)
    
    * [BEAM-7463] Fix BQ IT flake with streaming inserts
    
    - In short: streaming inserts are not immediately available after
      InsertAll RPC is done. See bug for details.
    - Added cumulative time limit option to retry mechanism. This may also
      be useful for ITs where the matcher needs to wait for a streaming
      pipeline to complete writing its output.
    
    * Fix lint
---
 .../io/gcp/big_query_query_to_table_it_test.py     |  7 +++-
 .../apache_beam/io/gcp/tests/bigquery_matcher.py   | 25 ++++++++++++-
 .../io/gcp/tests/bigquery_matcher_test.py          | 43 +++++++++++++++++++++-
 sdks/python/apache_beam/testing/test_utils.py      |  9 ++---
 sdks/python/apache_beam/utils/retry.py             | 26 +++++++++++--
 sdks/python/apache_beam/utils/retry_test.py        | 20 ++++++++++
 6 files changed, 114 insertions(+), 16 deletions(-)

diff --git a/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py 
b/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py
index c806629..c20ed47 100644
--- a/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py
+++ b/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py
@@ -152,8 +152,9 @@ class BigQueryQueryToTableIT(unittest.TestCase):
     # handling the encoding in beam
     for row in table_data:
       row['bytes'] = base64.b64encode(row['bytes']).decode('utf-8')
-    self.bigquery_client.insert_rows(
+    passed, errors = self.bigquery_client.insert_rows(
         self.project, self.dataset_id, NEW_TYPES_INPUT_TABLE, table_data)
+    self.assertTrue(passed, 'Error in BQ setup: %s' % errors)
 
   @attr('IT')
   def test_big_query_legacy_sql(self):
@@ -293,7 +294,9 @@ class BigQueryQueryToTableIT(unittest.TestCase):
         BigqueryMatcher(
             project=self.project,
             query=verify_query,
-            checksum=expected_checksum)
+            checksum=expected_checksum,
+            timeout_secs=30,
+        )
     ]
     self._setup_new_types_env()
     extra_opts = {
diff --git a/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py 
b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
index 1a72d72..20ca128 100644
--- a/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
+++ b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
@@ -63,13 +63,16 @@ class BigqueryMatcher(BaseMatcher):
   Fetch Bigquery data with given query, compute a hash string and compare
   with expected checksum.
   """
-  def __init__(self, project, query, checksum):
+  def __init__(self, project, query, checksum, timeout_secs=0):
     """Initialize BigQueryMatcher object.
     Args:
       project: The name (string) of the project.
       query: The query (string) to perform.
       checksum: SHA-1 hash generated from a sorted list of lines
         read from expected output.
+      timeout_secs: Duration to retry query until checksum matches. This
+        is useful for DF streaming pipelines or BQ streaming inserts. The
+        default (0) never retries.
     """
     if bigquery is None:
       raise ImportError('Bigquery dependencies are not installed.')
@@ -82,9 +85,16 @@ class BigqueryMatcher(BaseMatcher):
     self.query = query
     self.expected_checksum = checksum
     self.checksum = None
+    self.timeout_secs = timeout_secs
 
   def _matches(self, _):
-    if self.checksum is None:
+    @retry.with_exponential_backoff(
+        num_retries=1000,
+        initial_delay_secs=0.5,
+        max_delay_secs=30,
+        stop_after_secs=self.timeout_secs,
+    )
+    def get_checksum():
       response = self._query_with_retry()
       _LOGGER.info(
           'Read from given query (%s), total rows %d',
@@ -92,6 +102,17 @@ class BigqueryMatcher(BaseMatcher):
           len(response))
       self.checksum = compute_hash(response)
       _LOGGER.info('Generate checksum: %s', self.checksum)
+      if self.checksum != self.expected_checksum:
+        # This exception is never raised beyond the enclosing method.
+        raise ValueError(
+            'Checksums do not match. Expected: %s, got: %s' %
+            (self.expected_checksum, self.checksum))
+
+    if self.checksum is None:
+      try:
+        get_checksum()
+      except ValueError:
+        pass
 
     return self.checksum == self.expected_checksum
 
diff --git a/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher_test.py 
b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher_test.py
index 7d7e8cd..ec22317 100644
--- a/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher_test.py
+++ b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher_test.py
@@ -57,14 +57,38 @@ class BigqueryMatcherTest(unittest.TestCase):
     mock_query_result[1].values.return_value = None
     mock_query_result[2].values.return_value = None
 
-    mock_bigquery.return_value.query.return_value.result.return_value = (
-        mock_query_result)
+    mock_query = mock_bigquery.return_value.query
+    mock_query.return_value.result.return_value = mock_query_result
 
     matcher = bq_verifier.BigqueryMatcher(
         'mock_project',
         'mock_query',
         '59f9d6bdee30d67ea73b8aded121c3a0280f9cd8')
     hc_assert_that(self._mock_result, matcher)
+    self.assertEqual(1, mock_query.call_count)
+
+  def test_bigquery_matcher_success_streaming_retry(self, mock_bigquery):
+    # Simulate case where a streaming insert takes time to process, such that
+    # the first query result is incomplete (empty).
+    empty_query_result = []
+    mock_query_result = [mock.Mock(), mock.Mock(), mock.Mock()]
+    mock_query_result[0].values.return_value = []
+    mock_query_result[1].values.return_value = None
+    mock_query_result[2].values.return_value = None
+
+    mock_query = mock_bigquery.return_value.query
+    mock_query.return_value.result.side_effect = [
+        empty_query_result, mock_query_result
+    ]
+
+    matcher = bq_verifier.BigqueryMatcher(
+        'mock_project',
+        'mock_query',
+        '59f9d6bdee30d67ea73b8aded121c3a0280f9cd8',
+        timeout_secs=5,
+    )
+    hc_assert_that(self._mock_result, matcher)
+    self.assertEqual(2, mock_query.call_count)
 
   def test_bigquery_matcher_query_error_retry(self, mock_bigquery):
     mock_query = mock_bigquery.return_value.query
@@ -76,6 +100,21 @@ class BigqueryMatcherTest(unittest.TestCase):
       hc_assert_that(self._mock_result, matcher)
     self.assertEqual(bq_verifier.MAX_RETRIES + 1, mock_query.call_count)
 
+  def test_bigquery_matcher_query_error_checksum(self, mock_bigquery):
+    empty_query_result = []
+
+    mock_query = mock_bigquery.return_value.query
+    mock_query.return_value.result.return_value = empty_query_result
+
+    matcher = bq_verifier.BigqueryMatcher(
+        'mock_project',
+        'mock_query',
+        '59f9d6bdee30d67ea73b8aded121c3a0280f9cd8',
+    )
+    with self.assertRaisesRegex(AssertionError, r'Expected checksum'):
+      hc_assert_that(self._mock_result, matcher)
+    self.assertEqual(1, mock_query.call_count)
+
 
 @unittest.skipIf(bigquery is None, 'Bigquery dependencies are not installed.')
 @mock.patch.object(bigquery_tools, 'BigQueryWrapper')
diff --git a/sdks/python/apache_beam/testing/test_utils.py 
b/sdks/python/apache_beam/testing/test_utils.py
index 14bda88..cb81091 100644
--- a/sdks/python/apache_beam/testing/test_utils.py
+++ b/sdks/python/apache_beam/testing/test_utils.py
@@ -103,13 +103,10 @@ def patch_retry(testcase, module):
   """
   real_retry_with_exponential_backoff = retry.with_exponential_backoff
 
-  def patched_retry_with_exponential_backoff(num_retries, retry_filter):
+  def patched_retry_with_exponential_backoff(**kwargs):
     """A patch for retry decorator to use a mock dummy clock and logger."""
-    return real_retry_with_exponential_backoff(
-        num_retries=num_retries,
-        retry_filter=retry_filter,
-        logger=Mock(),
-        clock=Mock())
+    kwargs.update(logger=Mock(), clock=Mock())
+    return real_retry_with_exponential_backoff(**kwargs)
 
   patch.object(
       retry,
diff --git a/sdks/python/apache_beam/utils/retry.py 
b/sdks/python/apache_beam/utils/retry.py
index 0831b24..0533f5b 100644
--- a/sdks/python/apache_beam/utils/retry.py
+++ b/sdks/python/apache_beam/utils/retry.py
@@ -87,6 +87,10 @@ class FuzzedExponentialIntervals(object):
     max_delay_secs: Maximum delay (in seconds). After this limit is reached,
       further tries use max_delay_sec instead of exponentially increasing
       the time. Defaults to 1 hour.
+    stop_after_secs: Places a limit on the sum of intervals returned (in
+      seconds), such that the sum is <= stop_after_secs. Defaults to disabled
+      (None). You may need to increase num_retries to effectively use this
+      feature.
   """
   def __init__(
       self,
@@ -94,7 +98,8 @@ class FuzzedExponentialIntervals(object):
       num_retries,
       factor=2,
       fuzz=0.5,
-      max_delay_secs=60 * 60 * 1):
+      max_delay_secs=60 * 60 * 1,
+      stop_after_secs=None):
     self._initial_delay_secs = initial_delay_secs
     if num_retries > 10000:
       raise ValueError('num_retries parameter cannot exceed 10000.')
@@ -104,12 +109,19 @@ class FuzzedExponentialIntervals(object):
       raise ValueError('fuzz parameter expected to be in [0, 1] range.')
     self._fuzz = fuzz
     self._max_delay_secs = max_delay_secs
+    self._stop_after_secs = stop_after_secs
 
   def __iter__(self):
     current_delay_secs = min(self._max_delay_secs, self._initial_delay_secs)
+    total_delay_secs = 0
     for _ in range(self._num_retries):
       fuzz_multiplier = 1 - self._fuzz + random.random() * self._fuzz
-      yield current_delay_secs * fuzz_multiplier
+      delay_secs = current_delay_secs * fuzz_multiplier
+      total_delay_secs += delay_secs
+      if (self._stop_after_secs is not None and
+          total_delay_secs > self._stop_after_secs):
+        break
+      yield delay_secs
       current_delay_secs = min(
           self._max_delay_secs, current_delay_secs * self._factor)
 
@@ -189,7 +201,8 @@ def with_exponential_backoff(
     clock=Clock(),
     fuzz=True,
     factor=2,
-    max_delay_secs=60 * 60):
+    max_delay_secs=60 * 60,
+    stop_after_secs=None):
   """Decorator with arguments that control the retry logic.
 
   Args:
@@ -211,6 +224,10 @@ def with_exponential_backoff(
     max_delay_secs: Maximum delay (in seconds). After this limit is reached,
       further tries use max_delay_sec instead of exponentially increasing
       the time. Defaults to 1 hour.
+    stop_after_secs: Places a limit on the sum of delays between retries, such
+      that the sum is <= stop_after_secs. Retries will stop after the limit is
+      reached. Defaults to disabled (None). You may need to increase 
num_retries
+      to effectively use this feature.
 
   Returns:
     As per Python decorators with arguments pattern returns a decorator
@@ -236,7 +253,8 @@ def with_exponential_backoff(
               num_retries,
               factor,
               fuzz=0.5 if fuzz else 0,
-              max_delay_secs=max_delay_secs))
+              max_delay_secs=max_delay_secs,
+              stop_after_secs=stop_after_secs))
       while True:
         try:
           return fun(*args, **kwargs)
diff --git a/sdks/python/apache_beam/utils/retry_test.py 
b/sdks/python/apache_beam/utils/retry_test.py
index c8b6b04..f931fbe 100644
--- a/sdks/python/apache_beam/utils/retry_test.py
+++ b/sdks/python/apache_beam/utils/retry_test.py
@@ -24,6 +24,8 @@ from __future__ import absolute_import
 import unittest
 from builtins import object
 
+from parameterized import parameterized
+
 from apache_beam.utils import retry
 
 # Protect against environments where apitools library is not available.
@@ -153,6 +155,24 @@ class RetryTest(unittest.TestCase):
     self.assertEqual(len(self.clock.calls), 7)
     self.assertEqual(self.clock.calls[0], 10.0)
 
+  @parameterized.expand([(str(i), i) for i in range(0, 1000, 47)])
+  def test_with_stop_after_secs(self, _, stop_after_secs):
+    max_delay_secs = 10
+    self.assertRaises(
+        NotImplementedError,
+        retry.with_exponential_backoff(
+            num_retries=10000,
+            initial_delay_secs=10.0,
+            clock=self.clock,
+            fuzz=False,
+            max_delay_secs=max_delay_secs,
+            stop_after_secs=stop_after_secs)(self.permanent_failure),
+        10,
+        b=20)
+    total_delay = sum(self.clock.calls)
+    self.assertLessEqual(total_delay, stop_after_secs)
+    self.assertGreaterEqual(total_delay, stop_after_secs - max_delay_secs)
+
   def test_log_calls_for_permanent_failure(self):
     self.assertRaises(
         NotImplementedError,

Reply via email to