This is an automated email from the ASF dual-hosted git repository.
udim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new fe6c2c7 [BEAM-7463] Fix BQ IT flake with streaming inserts (#12951)
fe6c2c7 is described below
commit fe6c2c7b49a22f71526d2f949e9ad483eb898be3
Author: Udi Meiri <[email protected]>
AuthorDate: Mon Oct 5 10:25:15 2020 -0700
[BEAM-7463] Fix BQ IT flake with streaming inserts (#12951)
* [BEAM-7463] Fix BQ IT flake with streaming inserts
- In short: streaming inserts are not immediately available after
InsertAll RPC is done. See bug for details.
- Added cumulative time limit option to retry mechanism. This may also
be useful for ITs where the matcher needs to wait for a streaming
pipeline to complete writing its output.
* Fix lint
---
.../io/gcp/big_query_query_to_table_it_test.py | 7 +++-
.../apache_beam/io/gcp/tests/bigquery_matcher.py | 25 ++++++++++++-
.../io/gcp/tests/bigquery_matcher_test.py | 43 +++++++++++++++++++++-
sdks/python/apache_beam/testing/test_utils.py | 9 ++---
sdks/python/apache_beam/utils/retry.py | 26 +++++++++++--
sdks/python/apache_beam/utils/retry_test.py | 20 ++++++++++
6 files changed, 114 insertions(+), 16 deletions(-)
diff --git a/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py
b/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py
index c806629..c20ed47 100644
--- a/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py
+++ b/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py
@@ -152,8 +152,9 @@ class BigQueryQueryToTableIT(unittest.TestCase):
# handling the encoding in beam
for row in table_data:
row['bytes'] = base64.b64encode(row['bytes']).decode('utf-8')
- self.bigquery_client.insert_rows(
+ passed, errors = self.bigquery_client.insert_rows(
self.project, self.dataset_id, NEW_TYPES_INPUT_TABLE, table_data)
+ self.assertTrue(passed, 'Error in BQ setup: %s' % errors)
@attr('IT')
def test_big_query_legacy_sql(self):
@@ -293,7 +294,9 @@ class BigQueryQueryToTableIT(unittest.TestCase):
BigqueryMatcher(
project=self.project,
query=verify_query,
- checksum=expected_checksum)
+ checksum=expected_checksum,
+ timeout_secs=30,
+ )
]
self._setup_new_types_env()
extra_opts = {
diff --git a/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
index 1a72d72..20ca128 100644
--- a/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
+++ b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
@@ -63,13 +63,16 @@ class BigqueryMatcher(BaseMatcher):
Fetch Bigquery data with given query, compute a hash string and compare
with expected checksum.
"""
- def __init__(self, project, query, checksum):
+ def __init__(self, project, query, checksum, timeout_secs=0):
"""Initialize BigQueryMatcher object.
Args:
project: The name (string) of the project.
query: The query (string) to perform.
checksum: SHA-1 hash generated from a sorted list of lines
read from expected output.
+ timeout_secs: Duration to retry query until checksum matches. This
+ is useful for DF streaming pipelines or BQ streaming inserts. The
+ default (0) never retries.
"""
if bigquery is None:
raise ImportError('Bigquery dependencies are not installed.')
@@ -82,9 +85,16 @@ class BigqueryMatcher(BaseMatcher):
self.query = query
self.expected_checksum = checksum
self.checksum = None
+ self.timeout_secs = timeout_secs
def _matches(self, _):
- if self.checksum is None:
+ @retry.with_exponential_backoff(
+ num_retries=1000,
+ initial_delay_secs=0.5,
+ max_delay_secs=30,
+ stop_after_secs=self.timeout_secs,
+ )
+ def get_checksum():
response = self._query_with_retry()
_LOGGER.info(
'Read from given query (%s), total rows %d',
@@ -92,6 +102,17 @@ class BigqueryMatcher(BaseMatcher):
len(response))
self.checksum = compute_hash(response)
_LOGGER.info('Generate checksum: %s', self.checksum)
+ if self.checksum != self.expected_checksum:
+ # This exception is never raised beyond the enclosing method.
+ raise ValueError(
+ 'Checksums do not match. Expected: %s, got: %s' %
+ (self.expected_checksum, self.checksum))
+
+ if self.checksum is None:
+ try:
+ get_checksum()
+ except ValueError:
+ pass
return self.checksum == self.expected_checksum
diff --git a/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher_test.py
b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher_test.py
index 7d7e8cd..ec22317 100644
--- a/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher_test.py
+++ b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher_test.py
@@ -57,14 +57,38 @@ class BigqueryMatcherTest(unittest.TestCase):
mock_query_result[1].values.return_value = None
mock_query_result[2].values.return_value = None
- mock_bigquery.return_value.query.return_value.result.return_value = (
- mock_query_result)
+ mock_query = mock_bigquery.return_value.query
+ mock_query.return_value.result.return_value = mock_query_result
matcher = bq_verifier.BigqueryMatcher(
'mock_project',
'mock_query',
'59f9d6bdee30d67ea73b8aded121c3a0280f9cd8')
hc_assert_that(self._mock_result, matcher)
+ self.assertEqual(1, mock_query.call_count)
+
+ def test_bigquery_matcher_success_streaming_retry(self, mock_bigquery):
+ # Simulate case where a streaming insert takes time to process, such that
+ # the first query result is incomplete (empty).
+ empty_query_result = []
+ mock_query_result = [mock.Mock(), mock.Mock(), mock.Mock()]
+ mock_query_result[0].values.return_value = []
+ mock_query_result[1].values.return_value = None
+ mock_query_result[2].values.return_value = None
+
+ mock_query = mock_bigquery.return_value.query
+ mock_query.return_value.result.side_effect = [
+ empty_query_result, mock_query_result
+ ]
+
+ matcher = bq_verifier.BigqueryMatcher(
+ 'mock_project',
+ 'mock_query',
+ '59f9d6bdee30d67ea73b8aded121c3a0280f9cd8',
+ timeout_secs=5,
+ )
+ hc_assert_that(self._mock_result, matcher)
+ self.assertEqual(2, mock_query.call_count)
def test_bigquery_matcher_query_error_retry(self, mock_bigquery):
mock_query = mock_bigquery.return_value.query
@@ -76,6 +100,21 @@ class BigqueryMatcherTest(unittest.TestCase):
hc_assert_that(self._mock_result, matcher)
self.assertEqual(bq_verifier.MAX_RETRIES + 1, mock_query.call_count)
+ def test_bigquery_matcher_query_error_checksum(self, mock_bigquery):
+ empty_query_result = []
+
+ mock_query = mock_bigquery.return_value.query
+ mock_query.return_value.result.return_value = empty_query_result
+
+ matcher = bq_verifier.BigqueryMatcher(
+ 'mock_project',
+ 'mock_query',
+ '59f9d6bdee30d67ea73b8aded121c3a0280f9cd8',
+ )
+ with self.assertRaisesRegex(AssertionError, r'Expected checksum'):
+ hc_assert_that(self._mock_result, matcher)
+ self.assertEqual(1, mock_query.call_count)
+
@unittest.skipIf(bigquery is None, 'Bigquery dependencies are not installed.')
@mock.patch.object(bigquery_tools, 'BigQueryWrapper')
diff --git a/sdks/python/apache_beam/testing/test_utils.py
b/sdks/python/apache_beam/testing/test_utils.py
index 14bda88..cb81091 100644
--- a/sdks/python/apache_beam/testing/test_utils.py
+++ b/sdks/python/apache_beam/testing/test_utils.py
@@ -103,13 +103,10 @@ def patch_retry(testcase, module):
"""
real_retry_with_exponential_backoff = retry.with_exponential_backoff
- def patched_retry_with_exponential_backoff(num_retries, retry_filter):
+ def patched_retry_with_exponential_backoff(**kwargs):
"""A patch for retry decorator to use a mock dummy clock and logger."""
- return real_retry_with_exponential_backoff(
- num_retries=num_retries,
- retry_filter=retry_filter,
- logger=Mock(),
- clock=Mock())
+ kwargs.update(logger=Mock(), clock=Mock())
+ return real_retry_with_exponential_backoff(**kwargs)
patch.object(
retry,
diff --git a/sdks/python/apache_beam/utils/retry.py
b/sdks/python/apache_beam/utils/retry.py
index 0831b24..0533f5b 100644
--- a/sdks/python/apache_beam/utils/retry.py
+++ b/sdks/python/apache_beam/utils/retry.py
@@ -87,6 +87,10 @@ class FuzzedExponentialIntervals(object):
max_delay_secs: Maximum delay (in seconds). After this limit is reached,
further tries use max_delay_sec instead of exponentially increasing
the time. Defaults to 1 hour.
+ stop_after_secs: Places a limit on the sum of intervals returned (in
+ seconds), such that the sum is <= stop_after_secs. Defaults to disabled
+ (None). You may need to increase num_retries to effectively use this
+ feature.
"""
def __init__(
self,
@@ -94,7 +98,8 @@ class FuzzedExponentialIntervals(object):
num_retries,
factor=2,
fuzz=0.5,
- max_delay_secs=60 * 60 * 1):
+ max_delay_secs=60 * 60 * 1,
+ stop_after_secs=None):
self._initial_delay_secs = initial_delay_secs
if num_retries > 10000:
raise ValueError('num_retries parameter cannot exceed 10000.')
@@ -104,12 +109,19 @@ class FuzzedExponentialIntervals(object):
raise ValueError('fuzz parameter expected to be in [0, 1] range.')
self._fuzz = fuzz
self._max_delay_secs = max_delay_secs
+ self._stop_after_secs = stop_after_secs
def __iter__(self):
current_delay_secs = min(self._max_delay_secs, self._initial_delay_secs)
+ total_delay_secs = 0
for _ in range(self._num_retries):
fuzz_multiplier = 1 - self._fuzz + random.random() * self._fuzz
- yield current_delay_secs * fuzz_multiplier
+ delay_secs = current_delay_secs * fuzz_multiplier
+ total_delay_secs += delay_secs
+ if (self._stop_after_secs is not None and
+ total_delay_secs > self._stop_after_secs):
+ break
+ yield delay_secs
current_delay_secs = min(
self._max_delay_secs, current_delay_secs * self._factor)
@@ -189,7 +201,8 @@ def with_exponential_backoff(
clock=Clock(),
fuzz=True,
factor=2,
- max_delay_secs=60 * 60):
+ max_delay_secs=60 * 60,
+ stop_after_secs=None):
"""Decorator with arguments that control the retry logic.
Args:
@@ -211,6 +224,10 @@ def with_exponential_backoff(
max_delay_secs: Maximum delay (in seconds). After this limit is reached,
further tries use max_delay_sec instead of exponentially increasing
the time. Defaults to 1 hour.
+ stop_after_secs: Places a limit on the sum of delays between retries, such
+ that the sum is <= stop_after_secs. Retries will stop after the limit is
+ reached. Defaults to disabled (None). You may need to increase
num_retries
+ to effectively use this feature.
Returns:
As per Python decorators with arguments pattern returns a decorator
@@ -236,7 +253,8 @@ def with_exponential_backoff(
num_retries,
factor,
fuzz=0.5 if fuzz else 0,
- max_delay_secs=max_delay_secs))
+ max_delay_secs=max_delay_secs,
+ stop_after_secs=stop_after_secs))
while True:
try:
return fun(*args, **kwargs)
diff --git a/sdks/python/apache_beam/utils/retry_test.py
b/sdks/python/apache_beam/utils/retry_test.py
index c8b6b04..f931fbe 100644
--- a/sdks/python/apache_beam/utils/retry_test.py
+++ b/sdks/python/apache_beam/utils/retry_test.py
@@ -24,6 +24,8 @@ from __future__ import absolute_import
import unittest
from builtins import object
+from parameterized import parameterized
+
from apache_beam.utils import retry
# Protect against environments where apitools library is not available.
@@ -153,6 +155,24 @@ class RetryTest(unittest.TestCase):
self.assertEqual(len(self.clock.calls), 7)
self.assertEqual(self.clock.calls[0], 10.0)
+ @parameterized.expand([(str(i), i) for i in range(0, 1000, 47)])
+ def test_with_stop_after_secs(self, _, stop_after_secs):
+ max_delay_secs = 10
+ self.assertRaises(
+ NotImplementedError,
+ retry.with_exponential_backoff(
+ num_retries=10000,
+ initial_delay_secs=10.0,
+ clock=self.clock,
+ fuzz=False,
+ max_delay_secs=max_delay_secs,
+ stop_after_secs=stop_after_secs)(self.permanent_failure),
+ 10,
+ b=20)
+ total_delay = sum(self.clock.calls)
+ self.assertLessEqual(total_delay, stop_after_secs)
+ self.assertGreaterEqual(total_delay, stop_after_secs - max_delay_secs)
+
def test_log_calls_for_permanent_failure(self):
self.assertRaises(
NotImplementedError,