This is an automated email from the ASF dual-hosted git repository.
ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 0f727927c01 Return errors for insert_rows_json exceptions (#21080)
(#28091)
0f727927c01 is described below
commit 0f727927c0136c63e8127ab5e9d3279fd5748dc4
Author: Adam Whitmore <[email protected]>
AuthorDate: Tue Sep 19 15:05:46 2023 -0400
Return errors for insert_rows_json exceptions (#21080) (#28091)
---
CHANGES.md | 1 +
sdks/python/apache_beam/io/gcp/bigquery_test.py | 853 +++++++++++++++------
sdks/python/apache_beam/io/gcp/bigquery_tools.py | 22 +-
.../apache_beam/io/gcp/bigquery_write_it_test.py | 51 +-
4 files changed, 681 insertions(+), 246 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 47c35fe3491..cdf93909cb6 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -82,6 +82,7 @@
## Bugfixes
* Fixed exception chaining issue in GCS connector (Python)
([#26769](https://github.com/apache/beam/issues/26769#issuecomment-1700422615)).
+* Fixed streaming inserts exception handling, GoogleAPICallErrors are now
retried according to retry strategy and routed to failed rows where appropriate
rather than causing a pipeline error (Python)
([#21080](https://github.com/apache/beam/issues/21080)).
## Security Fixes
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py
b/sdks/python/apache_beam/io/gcp/bigquery_test.py
index 3a3033dfcaf..7e9c1e63474 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py
@@ -77,7 +77,6 @@ from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
from apache_beam.transforms.display import DisplayData
from apache_beam.transforms.display_test import DisplayDataItemMatcher
-from apache_beam.utils import retry
# Protect against environments where bigquery library is not available.
# pylint: disable=wrong-import-order, wrong-import-position
@@ -931,77 +930,269 @@ class TestWriteToBigQuery(unittest.TestCase):
'GCP dependencies are not installed')
class BigQueryStreamingInsertsErrorHandling(unittest.TestCase):
- # Using https://cloud.google.com/bigquery/docs/error-messages and
- # https://googleapis.dev/python/google-api-core/latest/_modules/google
- # /api_core/exceptions.html
- # to determine error types and messages to try for retriables.
+ # Running tests with a variety of exceptions from https://googleapis.dev
+ #
/python/google-api-core/latest/_modules/google/api_core/exceptions.html.
+ # Choosing some exceptions that produce reasons included in
+ # bigquery_tools._NON_TRANSIENT_ERRORS and some that are not
@parameterized.expand([
+ # reason not in _NON_TRANSIENT_ERRORS for row 1 on first attempt
+ # transient error retried and succeeds on second attempt, 0 rows sent to
+ # failed rows
param(
- exception_type=exceptions.Forbidden if exceptions else None,
- error_reason='rateLimitExceeded'),
+ insert_response=[
+ exceptions.TooManyRequests if exceptions else None,
+ None],
+ error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS
+ failed_rows=[]),
+ # reason not in _NON_TRANSIENT_ERRORS for row 1 on both attempts, sent to
+ # failed rows after hitting max_retries
+ param(
+ insert_response=[
+ exceptions.InternalServerError if exceptions else None,
+ exceptions.InternalServerError if exceptions else None],
+ error_reason='Internal Server Error', # not in _NON_TRANSIENT_ERRORS
+ failed_rows=['value1', 'value3', 'value5']),
+ # reason in _NON_TRANSIENT_ERRORS for row 1 on both attempts, sent to
+ # failed_rows after hitting max_retries
+ param(
+ insert_response=[
+ exceptions.Forbidden if exceptions else None,
+ exceptions.Forbidden if exceptions else None],
+ error_reason='Forbidden', # in _NON_TRANSIENT_ERRORS
+ failed_rows=['value1', 'value3', 'value5']),
+ ])
+ def test_insert_rows_json_exception_retry_always(
+ self, insert_response, error_reason, failed_rows):
+ # In this test, a pipeline will always retry all caught exception types
+ # since RetryStrategy is not set and defaults to RETRY_ALWAYS
+ with mock.patch('time.sleep'):
+ call_counter = 0
+ mock_response = mock.Mock()
+ mock_response.reason = error_reason
+
+ def store_callback(table, **kwargs):
+ nonlocal call_counter
+ # raise exception if insert_response element is an exception
+ if insert_response[call_counter]:
+ exception_type = insert_response[call_counter]
+ call_counter += 1
+ raise exception_type('some exception', response=mock_response)
+ # return empty list if not insert_response element, indicating
+ # successful call to insert_rows_json
+ else:
+ call_counter += 1
+ return []
+
+ client = mock.Mock()
+ client.insert_rows_json.side_effect = store_callback
+
+ # Using the bundle based direct runner to avoid pickling problems
+ # with mocks.
+ with beam.Pipeline(runner='BundleBasedDirectRunner') as p:
+ bq_write_out = (
+ p
+ | beam.Create([{
+ 'columnA': 'value1', 'columnB': 'value2'
+ }, {
+ 'columnA': 'value3', 'columnB': 'value4'
+ }, {
+ 'columnA': 'value5', 'columnB': 'value6'
+ }])
+ # Using _StreamToBigQuery in order to be able to pass max_retries
+ # in order to limit run time of test with RETRY_ALWAYS
+ | _StreamToBigQuery(
+ table_reference='project:dataset.table',
+ table_side_inputs=[],
+ schema_side_inputs=[],
+ schema='anyschema',
+ batch_size=None,
+ triggering_frequency=None,
+ create_disposition='CREATE_NEVER',
+ write_disposition=None,
+ kms_key=None,
+ retry_strategy=RetryStrategy.RETRY_ALWAYS,
+ additional_bq_parameters=[],
+ ignore_insert_ids=False,
+ ignore_unknown_columns=False,
+ with_auto_sharding=False,
+ test_client=client,
+ max_retries=len(insert_response) - 1,
+ num_streaming_keys=500))
+
+ failed_values = (
+ bq_write_out[beam_bq.BigQueryWriteFn.FAILED_ROWS]
+ | beam.Map(lambda x: x[1]['columnA']))
+
+ assert_that(failed_values, equal_to(failed_rows))
+
+ # Running tests with a variety of exceptions from https://googleapis.dev
+ #
/python/google-api-core/latest/_modules/google/api_core/exceptions.html.
+ # Choosing some exceptions that produce reasons that are included in
+ # bigquery_tools._NON_TRANSIENT_ERRORS and some that are not
+ @parameterized.expand([
+ param(
+ # not in _NON_TRANSIENT_ERRORS
+ exception_type=exceptions.BadGateway if exceptions else None,
+ error_reason='Bad Gateway'),
+ param(
+ # in _NON_TRANSIENT_ERRORS
+ exception_type=exceptions.Unauthorized if exceptions else None,
+ error_reason='Unauthorized'),
+ ])
+ @mock.patch('time.sleep')
+ @mock.patch('google.cloud.bigquery.Client.insert_rows_json')
+ def test_insert_rows_json_exception_retry_never(
+ self, mock_send, unused_mock_sleep, exception_type, error_reason):
+ # In this test, a pipeline will never retry caught exception types
+ # since RetryStrategy is set to RETRY_NEVER
+ mock_response = mock.Mock()
+ mock_response.reason = error_reason
+ mock_send.side_effect = [
+ exception_type('some exception', response=mock_response)
+ ]
+
+ with beam.Pipeline(runner='BundleBasedDirectRunner') as p:
+ bq_write_out = (
+ p
+ | beam.Create([{
+ 'columnA': 'value1'
+ }, {
+ 'columnA': 'value2'
+ }])
+ | WriteToBigQuery(
+ table='project:dataset.table',
+ schema={
+ 'fields': [{
+ 'name': 'columnA', 'type': 'STRING', 'mode': 'NULLABLE'
+ }]
+ },
+ create_disposition='CREATE_NEVER',
+ method='STREAMING_INSERTS',
+ insert_retry_strategy=RetryStrategy.RETRY_NEVER))
+ failed_values = (
+ bq_write_out[beam_bq.BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS]
+ | beam.Map(lambda x: x[1]['columnA']))
+
+ assert_that(failed_values, equal_to(['value1', 'value2']))
+
+ self.assertEqual(1, mock_send.call_count)
+
+ # Running tests with a variety of exceptions from https://googleapis.dev
+ #
/python/google-api-core/latest/_modules/google/api_core/exceptions.html.
+ # Choosing some exceptions that produce reasons that are included in
+ # bigquery_tools._NON_TRANSIENT_ERRORS and some that are not
+ @parameterized.expand([
param(
exception_type=exceptions.DeadlineExceeded if exceptions else None,
- error_reason='somereason'),
+ error_reason='Deadline Exceeded', # not in _NON_TRANSIENT_ERRORS
+ failed_values=[],
+ expected_call_count=2),
param(
- exception_type=exceptions.ServiceUnavailable if exceptions else None,
- error_reason='backendError'),
+ exception_type=exceptions.Conflict if exceptions else None,
+ error_reason='Conflict', # not in _NON_TRANSIENT_ERRORS
+ failed_values=[],
+ expected_call_count=2),
param(
- exception_type=exceptions.InternalServerError if exceptions else
None,
- error_reason='internalError'),
+ exception_type=exceptions.TooManyRequests if exceptions else None,
+ error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS
+ failed_values=[],
+ expected_call_count=2),
param(
exception_type=exceptions.InternalServerError if exceptions else
None,
- error_reason='backendError'),
+ error_reason='Internal Server Error', # not in _NON_TRANSIENT_ERRORS
+ failed_values=[],
+ expected_call_count=2),
+ param(
+ exception_type=exceptions.BadGateway if exceptions else None,
+ error_reason='Bad Gateway', # not in _NON_TRANSIENT_ERRORS
+ failed_values=[],
+ expected_call_count=2),
+ param(
+ exception_type=exceptions.ServiceUnavailable if exceptions else None,
+ error_reason='Service Unavailable', # not in _NON_TRANSIENT_ERRORS
+ failed_values=[],
+ expected_call_count=2),
+ param(
+ exception_type=exceptions.GatewayTimeout if exceptions else None,
+ error_reason='Gateway Timeout', # not in _NON_TRANSIENT_ERRORS
+ failed_values=[],
+ expected_call_count=2),
+ param(
+ exception_type=exceptions.BadRequest if exceptions else None,
+ error_reason='Bad Request', # in _NON_TRANSIENT_ERRORS
+ failed_values=['value1', 'value2'],
+ expected_call_count=1),
+ param(
+ exception_type=exceptions.Unauthorized if exceptions else None,
+ error_reason='Unauthorized', # in _NON_TRANSIENT_ERRORS
+ failed_values=['value1', 'value2'],
+ expected_call_count=1),
+ param(
+ exception_type=exceptions.Forbidden if exceptions else None,
+ error_reason='Forbidden', # in _NON_TRANSIENT_ERRORS
+ failed_values=['value1', 'value2'],
+ expected_call_count=1),
+ param(
+ exception_type=exceptions.NotFound if exceptions else None,
+ error_reason='Not Found', # in _NON_TRANSIENT_ERRORS
+ failed_values=['value1', 'value2'],
+ expected_call_count=1),
+ param(
+ exception_type=exceptions.MethodNotImplemented
+ if exceptions else None,
+ error_reason='Not Implemented', # in _NON_TRANSIENT_ERRORS
+ failed_values=['value1', 'value2'],
+ expected_call_count=1),
])
@mock.patch('time.sleep')
@mock.patch('google.cloud.bigquery.Client.insert_rows_json')
- def test_insert_all_retries_if_structured_retriable(
+ def test_insert_rows_json_exception_retry_on_transient_error(
self,
mock_send,
unused_mock_sleep,
- exception_type=None,
- error_reason=None):
- # In this test, a BATCH pipeline will retry the known RETRIABLE errors.
+ exception_type,
+ error_reason,
+ failed_values,
+ expected_call_count):
+ # In this test, a pipeline will only retry caught exception types
+ # with reasons that are not in _NON_TRANSIENT_ERRORS since RetryStrategy is
+ # set to RETRY_ON_TRANSIENT_ERROR
+ mock_response = mock.Mock()
+ mock_response.reason = error_reason
mock_send.side_effect = [
- exception_type(
- 'some retriable exception', errors=[{
- 'reason': error_reason
- }]),
- exception_type(
- 'some retriable exception', errors=[{
- 'reason': error_reason
- }]),
- exception_type(
- 'some retriable exception', errors=[{
- 'reason': error_reason
- }]),
- exception_type(
- 'some retriable exception', errors=[{
- 'reason': error_reason
- }]),
+ exception_type('some exception', response=mock_response),
+ # Return no exception and no errors on 2nd call, if there is a 2nd call
+ []
]
- with self.assertRaises(Exception) as exc:
- with beam.Pipeline() as p:
- _ = (
- p
- | beam.Create([{
- 'columnA': 'value1'
- }])
- | WriteToBigQuery(
- table='project:dataset.table',
- schema={
- 'fields': [{
- 'name': 'columnA', 'type': 'STRING', 'mode': 'NULLABLE'
- }]
- },
- create_disposition='CREATE_NEVER',
- method='STREAMING_INSERTS'))
- self.assertEqual(4, mock_send.call_count)
- self.assertIn('some retriable exception', exc.exception.args[0])
+ with beam.Pipeline(runner='BundleBasedDirectRunner') as p:
+ bq_write_out = (
+ p
+ | beam.Create([{
+ 'columnA': 'value1'
+ }, {
+ 'columnA': 'value2'
+ }])
+ | WriteToBigQuery(
+ table='project:dataset.table',
+ schema={
+ 'fields': [{
+ 'name': 'columnA', 'type': 'STRING', 'mode': 'NULLABLE'
+ }]
+ },
+ create_disposition='CREATE_NEVER',
+ method='STREAMING_INSERTS',
+ insert_retry_strategy=RetryStrategy.RETRY_ON_TRANSIENT_ERROR))
+ failed_values_out = (
+ bq_write_out[beam_bq.BigQueryWriteFn.FAILED_ROWS]
+ | beam.Map(lambda x: x[1]['columnA']))
+
+ assert_that(failed_values_out, equal_to(failed_values))
+ self.assertEqual(expected_call_count, mock_send.call_count)
- # Using https://googleapis.dev/python/google-api-core/latest/_modules/google
- # /api_core/exceptions.html
- # to determine error types and messages to try for retriables.
+ # Running tests with persistent exceptions with exception types not
+ # caught in BigQueryWrapper._insert_all_rows but retriable by
+ # retry.with_exponential_backoff
@parameterized.expand([
param(
exception_type=requests.exceptions.ConnectionError,
@@ -1009,28 +1200,18 @@ class
BigQueryStreamingInsertsErrorHandling(unittest.TestCase):
param(
exception_type=requests.exceptions.Timeout,
error_message='some timeout error'),
- param(
- exception_type=ConnectionError,
- error_message='some py connection error'),
- param(
- exception_type=exceptions.BadGateway if exceptions else None,
- error_message='some badgateway error'),
])
@mock.patch('time.sleep')
@mock.patch('google.cloud.bigquery.Client.insert_rows_json')
- def test_insert_all_retries_if_unstructured_retriable(
- self,
- mock_send,
- unused_mock_sleep,
- exception_type=None,
- error_message=None):
- # In this test, a BATCH pipeline will retry the unknown RETRIABLE errors.
- mock_send.side_effect = [
- exception_type(error_message),
- exception_type(error_message),
- exception_type(error_message),
- exception_type(error_message),
- ]
+ def test_insert_rows_json_persistent_retriable_exception(
+ self, mock_send, unused_mock_sleep, exception_type, error_message):
+ # In this test, each insert_rows_json call will result in an exception
+ # and be retried with retry.with_exponential_backoff until MAX_RETRIES is
+ # reached
+ mock_send.side_effect = exception_type(error_message)
+
+ # Expecting 1 initial call plus maximum number of retries
+ expected_call_count = 1 + bigquery_tools.MAX_RETRIES
with self.assertRaises(Exception) as exc:
with beam.Pipeline() as p:
@@ -1038,6 +1219,8 @@ class
BigQueryStreamingInsertsErrorHandling(unittest.TestCase):
p
| beam.Create([{
'columnA': 'value1'
+ }, {
+ 'columnA': 'value2'
}])
| WriteToBigQuery(
table='project:dataset.table',
@@ -1048,138 +1231,390 @@ class
BigQueryStreamingInsertsErrorHandling(unittest.TestCase):
},
create_disposition='CREATE_NEVER',
method='STREAMING_INSERTS'))
- self.assertEqual(4, mock_send.call_count)
+
+ self.assertEqual(expected_call_count, mock_send.call_count)
self.assertIn(error_message, exc.exception.args[0])
- # Using https://googleapis.dev/python/google-api-core/latest/_modules/google
- # /api_core/exceptions.html
- # to determine error types and messages to try for retriables.
+ # Running tests with intermittent exceptions with exception types not
+ # caught in BigQueryWrapper._insert_all_rows but retriable by
+ # retry.with_exponential_backoff
@parameterized.expand([
param(
- exception_type=retry.PermanentException,
- error_args=('nonretriable', )),
- param(
- exception_type=exceptions.BadRequest if exceptions else None,
- error_args=(
- 'forbidden morbidden', [{
- 'reason': 'nonretriablereason'
- }])),
- param(
- exception_type=exceptions.BadRequest if exceptions else None,
- error_args=('BAD REQUEST!', [{
- 'reason': 'nonretriablereason'
- }])),
- param(
- exception_type=exceptions.MethodNotAllowed if exceptions else None,
- error_args=(
- 'method not allowed!', [{
- 'reason': 'nonretriablereason'
- }])),
- param(
- exception_type=exceptions.MethodNotAllowed if exceptions else None,
- error_args=('method not allowed!', 'args')),
- param(
- exception_type=exceptions.Unknown if exceptions else None,
- error_args=('unknown!', 'args')),
+ exception_type=requests.exceptions.ConnectionError,
+ error_message='some connection error'),
param(
- exception_type=exceptions.Aborted if exceptions else None,
- error_args=('abortet!', 'abort')),
+ exception_type=requests.exceptions.Timeout,
+ error_message='some timeout error'),
])
@mock.patch('time.sleep')
@mock.patch('google.cloud.bigquery.Client.insert_rows_json')
- def test_insert_all_unretriable_errors(
- self, mock_send, unused_mock_sleep, exception_type=None,
error_args=None):
- # In this test, a BATCH pipeline will retry the unknown RETRIABLE errors.
+ def test_insert_rows_json_intermittent_retriable_exception(
+ self, mock_send, unused_mock_sleep, exception_type, error_message):
+ # In this test, the first 2 insert_rows_json calls will result in an
+ # exception and be retried with retry.with_exponential_backoff. The last
+ # call will not raise an exception and will succeed.
mock_send.side_effect = [
- exception_type(*error_args),
- exception_type(*error_args),
- exception_type(*error_args),
- exception_type(*error_args),
+ exception_type(error_message), exception_type(error_message), []
]
- with self.assertRaises(Exception):
- with beam.Pipeline() as p:
- _ = (
+ with beam.Pipeline() as p:
+ _ = (
+ p
+ | beam.Create([{
+ 'columnA': 'value1'
+ }, {
+ 'columnA': 'value2'
+ }])
+ | WriteToBigQuery(
+ table='project:dataset.table',
+ schema={
+ 'fields': [{
+ 'name': 'columnA', 'type': 'STRING', 'mode': 'NULLABLE'
+ }]
+ },
+ create_disposition='CREATE_NEVER',
+ method='STREAMING_INSERTS'))
+
+ self.assertEqual(3, mock_send.call_count)
+
+ # Running tests with a variety of error reasons from
+ # https://cloud.google.com/bigquery/docs/error-messages
+ # This covers the scenario when
+ # the google.cloud.bigquery.Client.insert_rows_json call returns an error
list
+ # rather than raising an exception.
+ # Choosing some error reasons that are included in
+ # bigquery_tools._NON_TRANSIENT_ERRORS and some that are not
+ @parameterized.expand([
+ # reason in _NON_TRANSIENT_ERRORS for row 1, sent to failed_rows
+ param(
+ insert_response=[
+ [{
+ 'index': 0, 'errors': [{
+ 'reason': 'invalid'
+ }]
+ }],
+ [{
+ 'index': 0, 'errors': [{
+ 'reason': 'invalid'
+ }]
+ }],
+ ],
+ failed_rows=['value1']),
+ # reason in _NON_TRANSIENT_ERRORS for row 1
+ # reason not in _NON_TRANSIENT_ERRORS for row 2 on 1st run
+ # row 1 sent to failed_rows
+ param(
+ insert_response=[
+ [{
+ 'index': 0, 'errors': [{
+ 'reason': 'invalid'
+ }]
+ }, {
+ 'index': 1, 'errors': [{
+ 'reason': 'internalError'
+ }]
+ }],
+ [{
+ 'index': 0, 'errors': [{
+ 'reason': 'invalid'
+ }]
+ }],
+ ],
+ failed_rows=['value1']),
+ # reason not in _NON_TRANSIENT_ERRORS for row 1 on first attempt
+ # transient error succeeds on second attempt, 0 rows sent to failed rows
+ param(
+ insert_response=[
+ [{
+ 'index': 0, 'errors': [{
+ 'reason': 'internalError'
+ }]
+ }],
+ [],
+ ],
+ failed_rows=[]),
+ ])
+ def test_insert_rows_json_errors_retry_always(
+ self, insert_response, failed_rows, unused_sleep_mock=None):
+ # In this test, a pipeline will always retry all errors
+ # since RetryStrategy is not set and defaults to RETRY_ALWAYS
+ with mock.patch('time.sleep'):
+ call_counter = 0
+
+ def store_callback(table, **kwargs):
+ nonlocal call_counter
+ response = insert_response[call_counter]
+ call_counter += 1
+ return response
+
+ client = mock.Mock()
+ client.insert_rows_json = mock.Mock(side_effect=store_callback)
+
+ # Using the bundle based direct runner to avoid pickling problems
+ # with mocks.
+ with beam.Pipeline(runner='BundleBasedDirectRunner') as p:
+ bq_write_out = (
p
| beam.Create([{
- 'columnA': 'value1'
+ 'columnA': 'value1', 'columnB': 'value2'
+ }, {
+ 'columnA': 'value3', 'columnB': 'value4'
+ }, {
+ 'columnA': 'value5', 'columnB': 'value6'
}])
- | WriteToBigQuery(
- table='project:dataset.table',
- schema={
- 'fields': [{
- 'name': 'columnA', 'type': 'STRING', 'mode': 'NULLABLE'
- }]
- },
+ # Using _StreamToBigQuery in order to be able to pass max_retries
+ # in order to limit run time of test with RETRY_ALWAYS
+ | _StreamToBigQuery(
+ table_reference='project:dataset.table',
+ table_side_inputs=[],
+ schema_side_inputs=[],
+ schema='anyschema',
+ batch_size=None,
+ triggering_frequency=None,
create_disposition='CREATE_NEVER',
- method='STREAMING_INSERTS'))
- self.assertEqual(1, mock_send.call_count)
+ write_disposition=None,
+ kms_key=None,
+ retry_strategy=RetryStrategy.RETRY_ALWAYS,
+ additional_bq_parameters=[],
+ ignore_insert_ids=False,
+ ignore_unknown_columns=False,
+ with_auto_sharding=False,
+ test_client=client,
+ max_retries=len(insert_response) - 1,
+ num_streaming_keys=500))
- # Using https://googleapis.dev/python/google-api-core/latest/_modules/google
- # /api_core/exceptions.html
- # to determine error types and messages to try for retriables.
+ failed_values = (
+ bq_write_out[beam_bq.BigQueryWriteFn.FAILED_ROWS]
+ | beam.Map(lambda x: x[1]['columnA']))
+
+ assert_that(failed_values, equal_to(failed_rows))
+
+ # Running tests with a variety of error reasons from
+ # https://cloud.google.com/bigquery/docs/error-messages
+ # This covers the scenario when
+ # the google.cloud.bigquery.Client.insert_rows_json call returns an error
list
+ # rather than raising an exception.
+ # Choosing some error reasons that are included in
+ # bigquery_tools._NON_TRANSIENT_ERRORS and some that are not
@parameterized.expand([
+ # reason in _NON_TRANSIENT_ERRORS for row 1, sent to failed_rows
param(
- exception_type=retry.PermanentException,
- error_args=('nonretriable', )),
- param(
- exception_type=exceptions.BadRequest if exceptions else None,
- error_args=(
- 'forbidden morbidden', [{
- 'reason': 'nonretriablereason'
- }])),
+ insert_response=[
+ [{
+ 'index': 0, 'errors': [{
+ 'reason': 'invalidQuery'
+ }]
+ }],
+ ],
+ streaming=False),
+ # reason not in _NON_TRANSIENT_ERRORS for row 1, sent to failed_rows
param(
- exception_type=exceptions.BadRequest if exceptions else None,
- error_args=('BAD REQUEST!', [{
- 'reason': 'nonretriablereason'
- }])),
+ insert_response=[
+ [{
+ 'index': 0, 'errors': [{
+ 'reason': 'internalError'
+ }]
+ }],
+ ],
+ streaming=False),
param(
- exception_type=exceptions.MethodNotAllowed if exceptions else None,
- error_args=(
- 'method not allowed!', [{
- 'reason': 'nonretriablereason'
- }])),
+ insert_response=[
+ [{
+ 'index': 0, 'errors': [{
+ 'reason': 'invalid'
+ }]
+ }],
+ ],
+ streaming=True),
+ # reason not in _NON_TRANSIENT_ERRORS for row 1, sent to failed_rows
param(
- exception_type=exceptions.MethodNotAllowed if exceptions else None,
- error_args=('method not allowed!', 'args')),
+ insert_response=[
+ [{
+ 'index': 0, 'errors': [{
+ 'reason': 'internalError'
+ }]
+ }],
+ ],
+ streaming=True),
+ ])
+ @mock.patch('time.sleep')
+ @mock.patch('google.cloud.bigquery.Client.insert_rows_json')
+ def test_insert_rows_json_errors_retry_never(
+ self, mock_send, unused_mock_sleep, insert_response, streaming):
+ # In this test, a pipeline will never retry errors since RetryStrategy is
+ # set to RETRY_NEVER
+ mock_send.side_effect = insert_response
+ opt = StandardOptions()
+ opt.streaming = streaming
+ with beam.Pipeline(runner='BundleBasedDirectRunner', options=opt) as p:
+ bq_write_out = (
+ p
+ | beam.Create([{
+ 'columnA': 'value1'
+ }, {
+ 'columnA': 'value2'
+ }])
+ | WriteToBigQuery(
+ table='project:dataset.table',
+ schema={
+ 'fields': [{
+ 'name': 'columnA', 'type': 'STRING', 'mode': 'NULLABLE'
+ }]
+ },
+ create_disposition='CREATE_NEVER',
+ method='STREAMING_INSERTS',
+ insert_retry_strategy=RetryStrategy.RETRY_NEVER))
+ failed_values = (
+ bq_write_out[beam_bq.BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS]
+ | beam.Map(lambda x: x[1]['columnA']))
+
+ assert_that(failed_values, equal_to(['value1']))
+
+ self.assertEqual(1, mock_send.call_count)
+
+ # Running tests with a variety of error reasons from
+ # https://cloud.google.com/bigquery/docs/error-messages
+ # This covers the scenario when
+ # the google.cloud.bigquery.Client.insert_rows_json call returns an error
list
+ # rather than raising an exception.
+ # Choosing some error reasons that are included in
+ # bigquery_tools._NON_TRANSIENT_ERRORS and some that are not
+ @parameterized.expand([
+ # reason in _NON_TRANSIENT_ERRORS for row 1, sent to failed_rows
param(
- exception_type=exceptions.Unknown if exceptions else None,
- error_args=('unknown!', 'args')),
+ insert_response=[
+ [{
+ 'index': 0, 'errors': [{
+ 'reason': 'invalid'
+ }]
+ }],
+ ],
+ failed_rows=['value1'],
+ streaming=False),
+ # reason not in _NON_TRANSIENT_ERRORS for row 1 on 1st attempt
+ # transient error succeeds on 2nd attempt, 0 rows sent to failed rows
param(
- exception_type=exceptions.Aborted if exceptions else None,
- error_args=('abortet!', 'abort')),
+ insert_response=[
+ [{
+ 'index': 0, 'errors': [{
+ 'reason': 'internalError'
+ }]
+ }],
+ [],
+ ],
+ failed_rows=[],
+ streaming=False),
+ # reason in _NON_TRANSIENT_ERRORS for row 1
+ # reason not in _NON_TRANSIENT_ERRORS for row 2 on 1st and 2nd attempt
+ # all rows with errors are retried when any row has a retriable error
+ # row 1 sent to failed_rows after final attempt
param(
- exception_type=requests.exceptions.ConnectionError,
- error_args=('some connection error', )),
+ insert_response=[
+ [{
+ 'index': 0, 'errors': [{
+ 'reason': 'invalid'
+ }]
+ }, {
+ 'index': 1, 'errors': [{
+ 'reason': 'internalError'
+ }]
+ }],
+ [
+ {
+ 'index': 0, 'errors': [{
+ 'reason': 'invalid'
+ }]
+ },
+ ],
+ ],
+ failed_rows=['value1'],
+ streaming=False),
+ # reason in _NON_TRANSIENT_ERRORS for row 1, sent to failed_rows
param(
- exception_type=requests.exceptions.Timeout,
- error_args=('some timeout error', )),
+ insert_response=[
+ [{
+ 'index': 0, 'errors': [{
+ 'reason': 'invalid'
+ }]
+ }],
+ ],
+ failed_rows=['value1'],
+ streaming=True),
+ # reason not in _NON_TRANSIENT_ERRORS for row 1 on 1st attempt
+ # transient error succeeds on 2nd attempt, 0 rows sent to failed rows
param(
- exception_type=ConnectionError,
- error_args=('some py connection error', )),
+ insert_response=[
+ [{
+ 'index': 0, 'errors': [{
+ 'reason': 'internalError'
+ }]
+ }],
+ [],
+ ],
+ failed_rows=[],
+ streaming=True),
+ # reason in _NON_TRANSIENT_ERRORS for row 1
+ # reason not in _NON_TRANSIENT_ERRORS for row 2 on 1st and 2nd attempt
+ # all rows with errors are retried when any row has a retriable error
+ # row 1 sent to failed_rows after final attempt
param(
- exception_type=exceptions.BadGateway if exceptions else None,
- error_args=('some badgateway error', )),
+ insert_response=[
+ [{
+ 'index': 0, 'errors': [{
+ 'reason': 'invalid'
+ }]
+ }, {
+ 'index': 1, 'errors': [{
+ 'reason': 'internalError'
+ }]
+ }],
+ [
+ {
+ 'index': 0, 'errors': [{
+ 'reason': 'invalid'
+ }]
+ },
+ ],
+ ],
+ failed_rows=['value1'],
+ streaming=True),
])
@mock.patch('time.sleep')
@mock.patch('google.cloud.bigquery.Client.insert_rows_json')
- def test_insert_all_unretriable_errors_streaming(
- self, mock_send, unused_mock_sleep, exception_type=None,
error_args=None):
- # In this test, a STREAMING pipeline will retry ALL errors, and never throw
- # an exception.
- mock_send.side_effect = [
- exception_type(*error_args),
- exception_type(*error_args),
- [] # Errors thrown twice, and then succeeded
- ]
+ def test_insert_rows_json_errors_retry_on_transient_error(
+ self,
+ mock_send,
+ unused_mock_sleep,
+ insert_response,
+ failed_rows,
+ streaming=False):
+ # In this test, a pipeline will only retry errors with reasons that are not
+ # in _NON_TRANSIENT_ERRORS since RetryStrategy is set to
+ # RETRY_ON_TRANSIENT_ERROR
+ call_counter = 0
+
+ def store_callback(table, **kwargs):
+ nonlocal call_counter
+ response = insert_response[call_counter]
+ call_counter += 1
+ return response
+
+ mock_send.side_effect = store_callback
opt = StandardOptions()
- opt.streaming = True
+ opt.streaming = streaming
+
+ # Using the bundle based direct runner to avoid pickling problems
+ # with mocks.
with beam.Pipeline(runner='BundleBasedDirectRunner', options=opt) as p:
- _ = (
+ bq_write_out = (
p
| beam.Create([{
'columnA': 'value1'
+ }, {
+ 'columnA': 'value2'
+ }, {
+ 'columnA': 'value3'
}])
| WriteToBigQuery(
table='project:dataset.table',
@@ -1189,8 +1624,14 @@ class
BigQueryStreamingInsertsErrorHandling(unittest.TestCase):
}]
},
create_disposition='CREATE_NEVER',
- method='STREAMING_INSERTS'))
- self.assertEqual(3, mock_send.call_count)
+ method='STREAMING_INSERTS',
+ insert_retry_strategy=RetryStrategy.RETRY_ON_TRANSIENT_ERROR))
+
+ failed_values = (
+ bq_write_out[beam_bq.BigQueryWriteFn.FAILED_ROWS]
+ | beam.Map(lambda x: x[1]['columnA']))
+
+ assert_that(failed_values, equal_to(failed_rows))
@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
@@ -1499,76 +1940,6 @@ class
PipelineBasedStreamingInsertTest(_TestCaseWithTempDirCleanUp):
result)
self.assertEqual(len(data1['colA_values']), 1)
- @parameterized.expand([
- param(retry_strategy=RetryStrategy.RETRY_ALWAYS),
- param(retry_strategy=RetryStrategy.RETRY_NEVER),
- param(retry_strategy=RetryStrategy.RETRY_ON_TRANSIENT_ERROR),
- ])
- def test_permanent_failure_in_some_rows_does_not_duplicate(
- self, unused_sleep_mock=None, retry_strategy=None):
- with mock.patch('time.sleep'):
-
- def store_callback(table, **kwargs):
- return [
- {
- 'index': 0,
- 'errors': [{
- 'reason': 'invalid'
- }, {
- 'reason': 'its bad'
- }]
- },
- ]
-
- client = mock.Mock()
- client.insert_rows_json = mock.Mock(side_effect=store_callback)
-
- # The expected rows to be inserted according to the insert strategy
- if retry_strategy == RetryStrategy.RETRY_NEVER:
- inserted_rows = ['value3', 'value5']
- else: # RETRY_ALWAYS and RETRY_ON_TRANSIENT_ERRORS should insert all
rows
- inserted_rows = ['value3', 'value5']
-
- # Using the bundle based direct runner to avoid pickling problems
- # with mocks.
- with beam.Pipeline(runner='BundleBasedDirectRunner') as p:
- bq_write_out = (
- p
- | beam.Create([{
- 'columnA': 'value1', 'columnB': 'value2'
- }, {
- 'columnA': 'value3', 'columnB': 'value4'
- }, {
- 'columnA': 'value5', 'columnB': 'value6'
- }])
- | _StreamToBigQuery(
- table_reference='project:dataset.table',
- table_side_inputs=[],
- schema_side_inputs=[],
- schema='anyschema',
- batch_size=None,
- triggering_frequency=None,
- create_disposition='CREATE_NEVER',
- write_disposition=None,
- kms_key=None,
- retry_strategy=retry_strategy,
- additional_bq_parameters=[],
- ignore_insert_ids=False,
- ignore_unknown_columns=False,
- with_auto_sharding=False,
- test_client=client,
- max_retries=10,
- num_streaming_keys=500))
-
- failed_values = (
- bq_write_out[beam_bq.BigQueryWriteFn.FAILED_ROWS]
- | beam.Map(lambda x: x[1]['columnA']))
-
- assert_that(
- failed_values,
- equal_to(
- list({'value1', 'value3',
'value5'}.difference(inserted_rows))))
-
@parameterized.expand([
param(with_auto_sharding=False),
param(with_auto_sharding=True),
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py
b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
index 07d711f8fc9..2f942079528 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
@@ -732,11 +732,13 @@ class BigQueryWrapper(object):
except (ClientError, GoogleAPICallError) as e:
# e.code contains the numeric http status code.
service_call_metric.call(e.code)
- # Re-reise the exception so that we re-try appropriately.
- raise
+ # Package exception with required fields
+ error = {'message': e.message, 'reason': e.response.reason}
+ # Add all rows to the errors list along with the error
+ errors = [{"index": i, "errors": [error]} for i, _ in enumerate(rows)]
except HttpError as e:
service_call_metric.call(e)
- # Re-reise the exception so that we re-try appropriately.
+ # Re-raise the exception so that we re-try appropriately.
raise
finally:
self._latency_histogram_metric.update(
@@ -1491,7 +1493,19 @@ class RetryStrategy(object):
RETRY_NEVER = 'RETRY_NEVER'
RETRY_ON_TRANSIENT_ERROR = 'RETRY_ON_TRANSIENT_ERROR'
- _NON_TRANSIENT_ERRORS = {'invalid', 'invalidQuery', 'notImplemented'}
+ # Values below may be found in reasons provided either in an
+ # error returned by a client method or by an http response as
+ # defined in google.api_core.exceptions
+ _NON_TRANSIENT_ERRORS = {
+ 'invalid',
+ 'invalidQuery',
+ 'notImplemented',
+ 'Bad Request',
+ 'Unauthorized',
+ 'Forbidden',
+ 'Not Found',
+ 'Not Implemented',
+ }
@staticmethod
def should_retry(strategy, error_message):
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py
b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py
index c73d3ff7e53..4b728fe7ec1 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py
@@ -379,7 +379,7 @@ class BigQueryWriteIntegrationTests(unittest.TestCase):
def test_big_query_write_insert_errors_reporting(self):
"""
Test that errors returned by beam.io.WriteToBigQuery
- contain both the failed rows amd the reason for it failing.
+ contain both the failed rows and the reason for it failing.
"""
table_name = 'python_write_table'
table_id = '{}.{}'.format(self.dataset_id, table_name)
@@ -454,6 +454,55 @@ class BigQueryWriteIntegrationTests(unittest.TestCase):
| 'ParseErrors' >> beam.Map(lambda err: (err[1], err[2])),
equal_to(bq_result_errors))
+ @pytest.mark.it_postcommit
+ def test_big_query_write_insert_non_transient_api_call_error(self):
+ """
+ Test that non-transient GoogleAPICallError errors returned
+ by beam.io.WriteToBigQuery are not retried and result in
+ FAILED_ROWS containing both the failed rows and the reason
+ for failure.
+ """
+ table_name = 'this_table_does_not_exist'
+ table_id = '{}.{}'.format(self.dataset_id, table_name)
+
+ input_data = [{
+ 'number': 1,
+ 'str': 'some_string',
+ }]
+
+ table_schema = {
+ "fields": [{
+ "name": "number", "type": "INTEGER", 'mode': 'NULLABLE'
+ }, {
+ "name": "str", "type": "STRING", 'mode': 'NULLABLE'
+ }]
+ }
+
+ bq_result_errors = [({
+ 'number': 1,
+ 'str': 'some_string',
+ }, "Not Found")]
+
+ args = self.test_pipeline.get_full_options_as_args()
+
+ with beam.Pipeline(argv=args) as p:
+ # pylint: disable=expression-not-assigned
+ errors = (
+ p | 'create' >> beam.Create(input_data)
+ | 'write' >> beam.io.WriteToBigQuery(
+ table_id,
+ schema=table_schema,
+ method='STREAMING_INSERTS',
+ insert_retry_strategy='RETRY_ON_TRANSIENT_ERROR',
+ create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
+ write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))
+
+ assert_that(
+ errors[BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS]
+ |
+ 'ParseErrors' >> beam.Map(lambda err: (err[1], err[2][0]["reason"])),
+ equal_to(bq_result_errors))
+
@pytest.mark.it_postcommit
@parameterized.expand([
param(file_format=FileFormat.AVRO),