This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 0f727927c01 Return errors for insert_rows_json exceptions (#21080) 
(#28091)
0f727927c01 is described below

commit 0f727927c0136c63e8127ab5e9d3279fd5748dc4
Author: Adam Whitmore <[email protected]>
AuthorDate: Tue Sep 19 15:05:46 2023 -0400

    Return errors for insert_rows_json exceptions (#21080) (#28091)
---
 CHANGES.md                                         |   1 +
 sdks/python/apache_beam/io/gcp/bigquery_test.py    | 853 +++++++++++++++------
 sdks/python/apache_beam/io/gcp/bigquery_tools.py   |  22 +-
 .../apache_beam/io/gcp/bigquery_write_it_test.py   |  51 +-
 4 files changed, 681 insertions(+), 246 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 47c35fe3491..cdf93909cb6 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -82,6 +82,7 @@
 ## Bugfixes
 
 * Fixed exception chaining issue in GCS connector (Python) 
([#26769](https://github.com/apache/beam/issues/26769#issuecomment-1700422615)).
+* Fixed streaming inserts exception handling, GoogleAPICallErrors are now 
retried according to retry strategy and routed to failed rows where appropriate 
rather than causing a pipeline error (Python) 
([#21080](https://github.com/apache/beam/issues/21080)).
 
 
 ## Security Fixes
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py 
b/sdks/python/apache_beam/io/gcp/bigquery_test.py
index 3a3033dfcaf..7e9c1e63474 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py
@@ -77,7 +77,6 @@ from apache_beam.testing.util import assert_that
 from apache_beam.testing.util import equal_to
 from apache_beam.transforms.display import DisplayData
 from apache_beam.transforms.display_test import DisplayDataItemMatcher
-from apache_beam.utils import retry
 
 # Protect against environments where bigquery library is not available.
 # pylint: disable=wrong-import-order, wrong-import-position
@@ -931,77 +930,269 @@ class TestWriteToBigQuery(unittest.TestCase):
     'GCP dependencies are not installed')
 class BigQueryStreamingInsertsErrorHandling(unittest.TestCase):
 
-  # Using https://cloud.google.com/bigquery/docs/error-messages and
-  # https://googleapis.dev/python/google-api-core/latest/_modules/google
-  #    /api_core/exceptions.html
-  # to determine error types and messages to try for retriables.
+  # Running tests with a variety of exceptions from  https://googleapis.dev
+  #     
/python/google-api-core/latest/_modules/google/api_core/exceptions.html.
+  # Choosing some exceptions that produce reasons included in
+  # bigquery_tools._NON_TRANSIENT_ERRORS and some that are not
   @parameterized.expand([
+      # reason not in _NON_TRANSIENT_ERRORS for row 1 on first attempt
+      # transient error retried and succeeds on second attempt, 0 rows sent to
+      # failed rows
       param(
-          exception_type=exceptions.Forbidden if exceptions else None,
-          error_reason='rateLimitExceeded'),
+          insert_response=[
+            exceptions.TooManyRequests if exceptions else None,
+            None],
+          error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS
+          failed_rows=[]),
+      # reason not in _NON_TRANSIENT_ERRORS for row 1 on both attempts, sent to
+      # failed rows after hitting max_retries
+      param(
+          insert_response=[
+            exceptions.InternalServerError if exceptions else None,
+            exceptions.InternalServerError if exceptions else None],
+          error_reason='Internal Server Error', # not in _NON_TRANSIENT_ERRORS
+          failed_rows=['value1', 'value3', 'value5']),
+      # reason in _NON_TRANSIENT_ERRORS for row 1 on both attempts, sent to
+      # failed_rows after hitting max_retries
+      param(
+          insert_response=[
+            exceptions.Forbidden if exceptions else None,
+            exceptions.Forbidden if exceptions else None],
+          error_reason='Forbidden', # in _NON_TRANSIENT_ERRORS
+          failed_rows=['value1', 'value3', 'value5']),
+  ])
+  def test_insert_rows_json_exception_retry_always(
+      self, insert_response, error_reason, failed_rows):
+    # In this test, a pipeline will always retry all caught exception types
+    # since RetryStrategy is not set and defaults to RETRY_ALWAYS
+    with mock.patch('time.sleep'):
+      call_counter = 0
+      mock_response = mock.Mock()
+      mock_response.reason = error_reason
+
+      def store_callback(table, **kwargs):
+        nonlocal call_counter
+        # raise exception if insert_response element is an exception
+        if insert_response[call_counter]:
+          exception_type = insert_response[call_counter]
+          call_counter += 1
+          raise exception_type('some exception', response=mock_response)
+        # return empty list if not insert_response element, indicating
+        # successful call to insert_rows_json
+        else:
+          call_counter += 1
+          return []
+
+      client = mock.Mock()
+      client.insert_rows_json.side_effect = store_callback
+
+      # Using the bundle based direct runner to avoid pickling problems
+      # with mocks.
+      with beam.Pipeline(runner='BundleBasedDirectRunner') as p:
+        bq_write_out = (
+            p
+            | beam.Create([{
+                'columnA': 'value1', 'columnB': 'value2'
+            }, {
+                'columnA': 'value3', 'columnB': 'value4'
+            }, {
+                'columnA': 'value5', 'columnB': 'value6'
+            }])
+            # Using _StreamToBigQuery in order to be able to pass max_retries
+            # in order to limit run time of test with RETRY_ALWAYS
+            | _StreamToBigQuery(
+                table_reference='project:dataset.table',
+                table_side_inputs=[],
+                schema_side_inputs=[],
+                schema='anyschema',
+                batch_size=None,
+                triggering_frequency=None,
+                create_disposition='CREATE_NEVER',
+                write_disposition=None,
+                kms_key=None,
+                retry_strategy=RetryStrategy.RETRY_ALWAYS,
+                additional_bq_parameters=[],
+                ignore_insert_ids=False,
+                ignore_unknown_columns=False,
+                with_auto_sharding=False,
+                test_client=client,
+                max_retries=len(insert_response) - 1,
+                num_streaming_keys=500))
+
+        failed_values = (
+            bq_write_out[beam_bq.BigQueryWriteFn.FAILED_ROWS]
+            | beam.Map(lambda x: x[1]['columnA']))
+
+        assert_that(failed_values, equal_to(failed_rows))
+
+  # Running tests with a variety of exceptions from  https://googleapis.dev
+  #     
/python/google-api-core/latest/_modules/google/api_core/exceptions.html.
+  # Choosing some exceptions that produce reasons that are included in
+  # bigquery_tools._NON_TRANSIENT_ERRORS and some that are not
+  @parameterized.expand([
+      param(
+          # not in _NON_TRANSIENT_ERRORS
+          exception_type=exceptions.BadGateway if exceptions else None,
+          error_reason='Bad Gateway'),
+      param(
+          # in _NON_TRANSIENT_ERRORS
+          exception_type=exceptions.Unauthorized if exceptions else None,
+          error_reason='Unauthorized'),
+  ])
+  @mock.patch('time.sleep')
+  @mock.patch('google.cloud.bigquery.Client.insert_rows_json')
+  def test_insert_rows_json_exception_retry_never(
+      self, mock_send, unused_mock_sleep, exception_type, error_reason):
+    # In this test, a pipeline will never retry caught exception types
+    # since RetryStrategy is set to RETRY_NEVER
+    mock_response = mock.Mock()
+    mock_response.reason = error_reason
+    mock_send.side_effect = [
+        exception_type('some exception', response=mock_response)
+    ]
+
+    with beam.Pipeline(runner='BundleBasedDirectRunner') as p:
+      bq_write_out = (
+          p
+          | beam.Create([{
+              'columnA': 'value1'
+          }, {
+              'columnA': 'value2'
+          }])
+          | WriteToBigQuery(
+              table='project:dataset.table',
+              schema={
+                  'fields': [{
+                      'name': 'columnA', 'type': 'STRING', 'mode': 'NULLABLE'
+                  }]
+              },
+              create_disposition='CREATE_NEVER',
+              method='STREAMING_INSERTS',
+              insert_retry_strategy=RetryStrategy.RETRY_NEVER))
+      failed_values = (
+          bq_write_out[beam_bq.BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS]
+          | beam.Map(lambda x: x[1]['columnA']))
+
+      assert_that(failed_values, equal_to(['value1', 'value2']))
+
+    self.assertEqual(1, mock_send.call_count)
+
+  # Running tests with a variety of exceptions from  https://googleapis.dev
+  #     
/python/google-api-core/latest/_modules/google/api_core/exceptions.html.
+  # Choosing some exceptions that produce reasons that are included in
+  # bigquery_tools._NON_TRANSIENT_ERRORS and some that are not
+  @parameterized.expand([
       param(
           exception_type=exceptions.DeadlineExceeded if exceptions else None,
-          error_reason='somereason'),
+          error_reason='Deadline Exceeded', # not in _NON_TRANSIENT_ERRORS
+          failed_values=[],
+          expected_call_count=2),
       param(
-          exception_type=exceptions.ServiceUnavailable if exceptions else None,
-          error_reason='backendError'),
+          exception_type=exceptions.Conflict if exceptions else None,
+          error_reason='Conflict', # not in _NON_TRANSIENT_ERRORS
+          failed_values=[],
+          expected_call_count=2),
       param(
-          exception_type=exceptions.InternalServerError if exceptions else 
None,
-          error_reason='internalError'),
+          exception_type=exceptions.TooManyRequests if exceptions else None,
+          error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS
+          failed_values=[],
+          expected_call_count=2),
       param(
           exception_type=exceptions.InternalServerError if exceptions else 
None,
-          error_reason='backendError'),
+          error_reason='Internal Server Error', # not in _NON_TRANSIENT_ERRORS
+          failed_values=[],
+          expected_call_count=2),
+      param(
+          exception_type=exceptions.BadGateway if exceptions else None,
+          error_reason='Bad Gateway', # not in _NON_TRANSIENT_ERRORS
+          failed_values=[],
+          expected_call_count=2),
+      param(
+          exception_type=exceptions.ServiceUnavailable if exceptions else None,
+          error_reason='Service Unavailable', # not in _NON_TRANSIENT_ERRORS
+          failed_values=[],
+          expected_call_count=2),
+      param(
+          exception_type=exceptions.GatewayTimeout if exceptions else None,
+          error_reason='Gateway Timeout', # not in _NON_TRANSIENT_ERRORS
+          failed_values=[],
+          expected_call_count=2),
+      param(
+          exception_type=exceptions.BadRequest if exceptions else None,
+          error_reason='Bad Request', # in _NON_TRANSIENT_ERRORS
+          failed_values=['value1', 'value2'],
+          expected_call_count=1),
+      param(
+          exception_type=exceptions.Unauthorized if exceptions else None,
+          error_reason='Unauthorized', # in _NON_TRANSIENT_ERRORS
+          failed_values=['value1', 'value2'],
+          expected_call_count=1),
+      param(
+          exception_type=exceptions.Forbidden if exceptions else None,
+          error_reason='Forbidden', # in _NON_TRANSIENT_ERRORS
+          failed_values=['value1', 'value2'],
+          expected_call_count=1),
+      param(
+          exception_type=exceptions.NotFound if exceptions else None,
+          error_reason='Not Found', # in _NON_TRANSIENT_ERRORS
+          failed_values=['value1', 'value2'],
+          expected_call_count=1),
+      param(
+          exception_type=exceptions.MethodNotImplemented
+            if exceptions else None,
+          error_reason='Not Implemented', # in _NON_TRANSIENT_ERRORS
+          failed_values=['value1', 'value2'],
+          expected_call_count=1),
   ])
   @mock.patch('time.sleep')
   @mock.patch('google.cloud.bigquery.Client.insert_rows_json')
-  def test_insert_all_retries_if_structured_retriable(
+  def test_insert_rows_json_exception_retry_on_transient_error(
       self,
       mock_send,
       unused_mock_sleep,
-      exception_type=None,
-      error_reason=None):
-    # In this test, a BATCH pipeline will retry the known RETRIABLE errors.
+      exception_type,
+      error_reason,
+      failed_values,
+      expected_call_count):
+    # In this test, a pipeline will only retry caught exception types
+    # with reasons that are not in _NON_TRANSIENT_ERRORS since RetryStrategy is
+    # set to RETRY_ON_TRANSIENT_ERROR
+    mock_response = mock.Mock()
+    mock_response.reason = error_reason
     mock_send.side_effect = [
-        exception_type(
-            'some retriable exception', errors=[{
-                'reason': error_reason
-            }]),
-        exception_type(
-            'some retriable exception', errors=[{
-                'reason': error_reason
-            }]),
-        exception_type(
-            'some retriable exception', errors=[{
-                'reason': error_reason
-            }]),
-        exception_type(
-            'some retriable exception', errors=[{
-                'reason': error_reason
-            }]),
+        exception_type('some exception', response=mock_response),
+        # Return no exception and no errors on 2nd call, if there is a 2nd call
+        []
     ]
 
-    with self.assertRaises(Exception) as exc:
-      with beam.Pipeline() as p:
-        _ = (
-            p
-            | beam.Create([{
-                'columnA': 'value1'
-            }])
-            | WriteToBigQuery(
-                table='project:dataset.table',
-                schema={
-                    'fields': [{
-                        'name': 'columnA', 'type': 'STRING', 'mode': 'NULLABLE'
-                    }]
-                },
-                create_disposition='CREATE_NEVER',
-                method='STREAMING_INSERTS'))
-    self.assertEqual(4, mock_send.call_count)
-    self.assertIn('some retriable exception', exc.exception.args[0])
+    with beam.Pipeline(runner='BundleBasedDirectRunner') as p:
+      bq_write_out = (
+          p
+          | beam.Create([{
+              'columnA': 'value1'
+          }, {
+              'columnA': 'value2'
+          }])
+          | WriteToBigQuery(
+              table='project:dataset.table',
+              schema={
+                  'fields': [{
+                      'name': 'columnA', 'type': 'STRING', 'mode': 'NULLABLE'
+                  }]
+              },
+              create_disposition='CREATE_NEVER',
+              method='STREAMING_INSERTS',
+              insert_retry_strategy=RetryStrategy.RETRY_ON_TRANSIENT_ERROR))
+      failed_values_out = (
+          bq_write_out[beam_bq.BigQueryWriteFn.FAILED_ROWS]
+          | beam.Map(lambda x: x[1]['columnA']))
+
+      assert_that(failed_values_out, equal_to(failed_values))
+    self.assertEqual(expected_call_count, mock_send.call_count)
 
-  # Using https://googleapis.dev/python/google-api-core/latest/_modules/google
-  #   /api_core/exceptions.html
-  # to determine error types and messages to try for retriables.
+  # Running tests with persistent exceptions with exception types not
+  # caught in BigQueryWrapper._insert_all_rows but retriable by
+  # retry.with_exponential_backoff
   @parameterized.expand([
       param(
           exception_type=requests.exceptions.ConnectionError,
@@ -1009,28 +1200,18 @@ class 
BigQueryStreamingInsertsErrorHandling(unittest.TestCase):
       param(
           exception_type=requests.exceptions.Timeout,
           error_message='some timeout error'),
-      param(
-          exception_type=ConnectionError,
-          error_message='some py connection error'),
-      param(
-          exception_type=exceptions.BadGateway if exceptions else None,
-          error_message='some badgateway error'),
   ])
   @mock.patch('time.sleep')
   @mock.patch('google.cloud.bigquery.Client.insert_rows_json')
-  def test_insert_all_retries_if_unstructured_retriable(
-      self,
-      mock_send,
-      unused_mock_sleep,
-      exception_type=None,
-      error_message=None):
-    # In this test, a BATCH pipeline will retry the unknown RETRIABLE errors.
-    mock_send.side_effect = [
-        exception_type(error_message),
-        exception_type(error_message),
-        exception_type(error_message),
-        exception_type(error_message),
-    ]
+  def test_insert_rows_json_persistent_retriable_exception(
+      self, mock_send, unused_mock_sleep, exception_type, error_message):
+    # In this test, each insert_rows_json call will result in an exception
+    # and be retried with retry.with_exponential_backoff until MAX_RETRIES is
+    # reached
+    mock_send.side_effect = exception_type(error_message)
+
+    # Expecting 1 initial call plus maximum number of retries
+    expected_call_count = 1 + bigquery_tools.MAX_RETRIES
 
     with self.assertRaises(Exception) as exc:
       with beam.Pipeline() as p:
@@ -1038,6 +1219,8 @@ class 
BigQueryStreamingInsertsErrorHandling(unittest.TestCase):
             p
             | beam.Create([{
                 'columnA': 'value1'
+            }, {
+                'columnA': 'value2'
             }])
             | WriteToBigQuery(
                 table='project:dataset.table',
@@ -1048,138 +1231,390 @@ class 
BigQueryStreamingInsertsErrorHandling(unittest.TestCase):
                 },
                 create_disposition='CREATE_NEVER',
                 method='STREAMING_INSERTS'))
-    self.assertEqual(4, mock_send.call_count)
+
+    self.assertEqual(expected_call_count, mock_send.call_count)
     self.assertIn(error_message, exc.exception.args[0])
 
-  # Using https://googleapis.dev/python/google-api-core/latest/_modules/google
-  #   /api_core/exceptions.html
-  # to determine error types and messages to try for retriables.
+  # Running tests with intermittent exceptions with exception types not
+  # caught in BigQueryWrapper._insert_all_rows but retriable by
+  # retry.with_exponential_backoff
   @parameterized.expand([
       param(
-          exception_type=retry.PermanentException,
-          error_args=('nonretriable', )),
-      param(
-          exception_type=exceptions.BadRequest if exceptions else None,
-          error_args=(
-              'forbidden morbidden', [{
-                  'reason': 'nonretriablereason'
-              }])),
-      param(
-          exception_type=exceptions.BadRequest if exceptions else None,
-          error_args=('BAD REQUEST!', [{
-              'reason': 'nonretriablereason'
-          }])),
-      param(
-          exception_type=exceptions.MethodNotAllowed if exceptions else None,
-          error_args=(
-              'method not allowed!', [{
-                  'reason': 'nonretriablereason'
-              }])),
-      param(
-          exception_type=exceptions.MethodNotAllowed if exceptions else None,
-          error_args=('method not allowed!', 'args')),
-      param(
-          exception_type=exceptions.Unknown if exceptions else None,
-          error_args=('unknown!', 'args')),
+          exception_type=requests.exceptions.ConnectionError,
+          error_message='some connection error'),
       param(
-          exception_type=exceptions.Aborted if exceptions else None,
-          error_args=('abortet!', 'abort')),
+          exception_type=requests.exceptions.Timeout,
+          error_message='some timeout error'),
   ])
   @mock.patch('time.sleep')
   @mock.patch('google.cloud.bigquery.Client.insert_rows_json')
-  def test_insert_all_unretriable_errors(
-      self, mock_send, unused_mock_sleep, exception_type=None, 
error_args=None):
-    # In this test, a BATCH pipeline will retry the unknown RETRIABLE errors.
+  def test_insert_rows_json_intermittent_retriable_exception(
+      self, mock_send, unused_mock_sleep, exception_type, error_message):
+    # In this test, the first 2 insert_rows_json calls will result in an
+    # exception and be retried with retry.with_exponential_backoff. The last
+    # call will not raise an exception and will succeed.
     mock_send.side_effect = [
-        exception_type(*error_args),
-        exception_type(*error_args),
-        exception_type(*error_args),
-        exception_type(*error_args),
+        exception_type(error_message), exception_type(error_message), []
     ]
 
-    with self.assertRaises(Exception):
-      with beam.Pipeline() as p:
-        _ = (
+    with beam.Pipeline() as p:
+      _ = (
+          p
+          | beam.Create([{
+              'columnA': 'value1'
+          }, {
+              'columnA': 'value2'
+          }])
+          | WriteToBigQuery(
+              table='project:dataset.table',
+              schema={
+                  'fields': [{
+                      'name': 'columnA', 'type': 'STRING', 'mode': 'NULLABLE'
+                  }]
+              },
+              create_disposition='CREATE_NEVER',
+              method='STREAMING_INSERTS'))
+
+    self.assertEqual(3, mock_send.call_count)
+
+  # Running tests with a variety of error reasons from
+  # https://cloud.google.com/bigquery/docs/error-messages
+  # This covers the scenario when
+  # the google.cloud.bigquery.Client.insert_rows_json call returns an error 
list
+  # rather than raising an exception.
+  # Choosing some error reasons that are included in
+  # bigquery_tools._NON_TRANSIENT_ERRORS and some that are not
+  @parameterized.expand([
+      # reason in _NON_TRANSIENT_ERRORS for row 1, sent to failed_rows
+      param(
+          insert_response=[
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'invalid'
+                  }]
+              }],
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'invalid'
+                  }]
+              }],
+          ],
+          failed_rows=['value1']),
+      # reason in _NON_TRANSIENT_ERRORS for row 1
+      # reason not in _NON_TRANSIENT_ERRORS for row 2 on 1st run
+      # row 1 sent to failed_rows
+      param(
+          insert_response=[
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'invalid'
+                  }]
+              }, {
+                  'index': 1, 'errors': [{
+                      'reason': 'internalError'
+                  }]
+              }],
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'invalid'
+                  }]
+              }],
+          ],
+          failed_rows=['value1']),
+      # reason not in _NON_TRANSIENT_ERRORS for row 1 on first attempt
+      # transient error succeeds on second attempt, 0 rows sent to failed rows
+      param(
+          insert_response=[
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'internalError'
+                  }]
+              }],
+              [],
+          ],
+          failed_rows=[]),
+  ])
+  def test_insert_rows_json_errors_retry_always(
+      self, insert_response, failed_rows, unused_sleep_mock=None):
+    # In this test, a pipeline will always retry all errors
+    # since RetryStrategy is not set and defaults to RETRY_ALWAYS
+    with mock.patch('time.sleep'):
+      call_counter = 0
+
+      def store_callback(table, **kwargs):
+        nonlocal call_counter
+        response = insert_response[call_counter]
+        call_counter += 1
+        return response
+
+      client = mock.Mock()
+      client.insert_rows_json = mock.Mock(side_effect=store_callback)
+
+      # Using the bundle based direct runner to avoid pickling problems
+      # with mocks.
+      with beam.Pipeline(runner='BundleBasedDirectRunner') as p:
+        bq_write_out = (
             p
             | beam.Create([{
-                'columnA': 'value1'
+                'columnA': 'value1', 'columnB': 'value2'
+            }, {
+                'columnA': 'value3', 'columnB': 'value4'
+            }, {
+                'columnA': 'value5', 'columnB': 'value6'
             }])
-            | WriteToBigQuery(
-                table='project:dataset.table',
-                schema={
-                    'fields': [{
-                        'name': 'columnA', 'type': 'STRING', 'mode': 'NULLABLE'
-                    }]
-                },
+            # Using _StreamToBigQuery in order to be able to pass max_retries
+            # in order to limit run time of test with RETRY_ALWAYS
+            | _StreamToBigQuery(
+                table_reference='project:dataset.table',
+                table_side_inputs=[],
+                schema_side_inputs=[],
+                schema='anyschema',
+                batch_size=None,
+                triggering_frequency=None,
                 create_disposition='CREATE_NEVER',
-                method='STREAMING_INSERTS'))
-    self.assertEqual(1, mock_send.call_count)
+                write_disposition=None,
+                kms_key=None,
+                retry_strategy=RetryStrategy.RETRY_ALWAYS,
+                additional_bq_parameters=[],
+                ignore_insert_ids=False,
+                ignore_unknown_columns=False,
+                with_auto_sharding=False,
+                test_client=client,
+                max_retries=len(insert_response) - 1,
+                num_streaming_keys=500))
 
-  # Using https://googleapis.dev/python/google-api-core/latest/_modules/google
-  #    /api_core/exceptions.html
-  # to determine error types and messages to try for retriables.
+        failed_values = (
+            bq_write_out[beam_bq.BigQueryWriteFn.FAILED_ROWS]
+            | beam.Map(lambda x: x[1]['columnA']))
+
+        assert_that(failed_values, equal_to(failed_rows))
+
+  # Running tests with a variety of error reasons from
+  # https://cloud.google.com/bigquery/docs/error-messages
+  # This covers the scenario when
+  # the google.cloud.bigquery.Client.insert_rows_json call returns an error 
list
+  # rather than raising an exception.
+  # Choosing some error reasons that are included in
+  # bigquery_tools._NON_TRANSIENT_ERRORS and some that are not
   @parameterized.expand([
+      # reason in _NON_TRANSIENT_ERRORS for row 1, sent to failed_rows
       param(
-          exception_type=retry.PermanentException,
-          error_args=('nonretriable', )),
-      param(
-          exception_type=exceptions.BadRequest if exceptions else None,
-          error_args=(
-              'forbidden morbidden', [{
-                  'reason': 'nonretriablereason'
-              }])),
+          insert_response=[
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'invalidQuery'
+                  }]
+              }],
+          ],
+          streaming=False),
+      # reason not in _NON_TRANSIENT_ERRORS for row 1, sent to failed_rows
       param(
-          exception_type=exceptions.BadRequest if exceptions else None,
-          error_args=('BAD REQUEST!', [{
-              'reason': 'nonretriablereason'
-          }])),
+          insert_response=[
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'internalError'
+                  }]
+              }],
+          ],
+          streaming=False),
       param(
-          exception_type=exceptions.MethodNotAllowed if exceptions else None,
-          error_args=(
-              'method not allowed!', [{
-                  'reason': 'nonretriablereason'
-              }])),
+          insert_response=[
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'invalid'
+                  }]
+              }],
+          ],
+          streaming=True),
+      # reason not in _NON_TRANSIENT_ERRORS for row 1, sent to failed_rows
       param(
-          exception_type=exceptions.MethodNotAllowed if exceptions else None,
-          error_args=('method not allowed!', 'args')),
+          insert_response=[
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'internalError'
+                  }]
+              }],
+          ],
+          streaming=True),
+  ])
+  @mock.patch('time.sleep')
+  @mock.patch('google.cloud.bigquery.Client.insert_rows_json')
+  def test_insert_rows_json_errors_retry_never(
+      self, mock_send, unused_mock_sleep, insert_response, streaming):
+    # In this test, a pipeline will never retry errors since RetryStrategy is
+    # set to RETRY_NEVER
+    mock_send.side_effect = insert_response
+    opt = StandardOptions()
+    opt.streaming = streaming
+    with beam.Pipeline(runner='BundleBasedDirectRunner', options=opt) as p:
+      bq_write_out = (
+          p
+          | beam.Create([{
+              'columnA': 'value1'
+          }, {
+              'columnA': 'value2'
+          }])
+          | WriteToBigQuery(
+              table='project:dataset.table',
+              schema={
+                  'fields': [{
+                      'name': 'columnA', 'type': 'STRING', 'mode': 'NULLABLE'
+                  }]
+              },
+              create_disposition='CREATE_NEVER',
+              method='STREAMING_INSERTS',
+              insert_retry_strategy=RetryStrategy.RETRY_NEVER))
+      failed_values = (
+          bq_write_out[beam_bq.BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS]
+          | beam.Map(lambda x: x[1]['columnA']))
+
+      assert_that(failed_values, equal_to(['value1']))
+
+    self.assertEqual(1, mock_send.call_count)
+
+  # Running tests with a variety of error reasons from
+  # https://cloud.google.com/bigquery/docs/error-messages
+  # This covers the scenario when
+  # the google.cloud.bigquery.Client.insert_rows_json call returns an error 
list
+  # rather than raising an exception.
+  # Choosing some error reasons that are included in
+  # bigquery_tools._NON_TRANSIENT_ERRORS and some that are not
+  @parameterized.expand([
+      # reason in _NON_TRANSIENT_ERRORS for row 1, sent to failed_rows
       param(
-          exception_type=exceptions.Unknown if exceptions else None,
-          error_args=('unknown!', 'args')),
+          insert_response=[
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'invalid'
+                  }]
+              }],
+          ],
+          failed_rows=['value1'],
+          streaming=False),
+      # reason not in _NON_TRANSIENT_ERRORS for row 1 on 1st attempt
+      # transient error succeeds on 2nd attempt, 0 rows sent to failed rows
       param(
-          exception_type=exceptions.Aborted if exceptions else None,
-          error_args=('abortet!', 'abort')),
+          insert_response=[
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'internalError'
+                  }]
+              }],
+              [],
+          ],
+          failed_rows=[],
+          streaming=False),
+      # reason in _NON_TRANSIENT_ERRORS for row 1
+      # reason not in _NON_TRANSIENT_ERRORS for row 2 on 1st and 2nd attempt
+      # all rows with errors are retried when any row has a retriable error
+      # row 1 sent to failed_rows after final attempt
       param(
-          exception_type=requests.exceptions.ConnectionError,
-          error_args=('some connection error', )),
+          insert_response=[
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'invalid'
+                  }]
+              }, {
+                  'index': 1, 'errors': [{
+                      'reason': 'internalError'
+                  }]
+              }],
+              [
+                  {
+                      'index': 0, 'errors': [{
+                          'reason': 'invalid'
+                      }]
+                  },
+              ],
+          ],
+          failed_rows=['value1'],
+          streaming=False),
+      # reason in _NON_TRANSIENT_ERRORS for row 1, sent to failed_rows
       param(
-          exception_type=requests.exceptions.Timeout,
-          error_args=('some timeout error', )),
+          insert_response=[
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'invalid'
+                  }]
+              }],
+          ],
+          failed_rows=['value1'],
+          streaming=True),
+      # reason not in _NON_TRANSIENT_ERRORS for row 1 on 1st attempt
+      # transient error succeeds on 2nd attempt, 0 rows sent to failed rows
       param(
-          exception_type=ConnectionError,
-          error_args=('some py connection error', )),
+          insert_response=[
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'internalError'
+                  }]
+              }],
+              [],
+          ],
+          failed_rows=[],
+          streaming=True),
+      # reason in _NON_TRANSIENT_ERRORS for row 1
+      # reason not in _NON_TRANSIENT_ERRORS for row 2 on 1st and 2nd attempt
+      # all rows with errors are retried when any row has a retriable error
+      # row 1 sent to failed_rows after final attempt
       param(
-          exception_type=exceptions.BadGateway if exceptions else None,
-          error_args=('some badgateway error', )),
+          insert_response=[
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'invalid'
+                  }]
+              }, {
+                  'index': 1, 'errors': [{
+                      'reason': 'internalError'
+                  }]
+              }],
+              [
+                  {
+                      'index': 0, 'errors': [{
+                          'reason': 'invalid'
+                      }]
+                  },
+              ],
+          ],
+          failed_rows=['value1'],
+          streaming=True),
   ])
   @mock.patch('time.sleep')
   @mock.patch('google.cloud.bigquery.Client.insert_rows_json')
-  def test_insert_all_unretriable_errors_streaming(
-      self, mock_send, unused_mock_sleep, exception_type=None, 
error_args=None):
-    # In this test, a STREAMING pipeline will retry ALL errors, and never throw
-    # an exception.
-    mock_send.side_effect = [
-        exception_type(*error_args),
-        exception_type(*error_args),
-        []  # Errors thrown twice, and then succeeded
-    ]
+  def test_insert_rows_json_errors_retry_on_transient_error(
+      self,
+      mock_send,
+      unused_mock_sleep,
+      insert_response,
+      failed_rows,
+      streaming=False):
+    # In this test, a pipeline will only retry errors with reasons that are not
+    # in _NON_TRANSIENT_ERRORS since RetryStrategy is set to
+    # RETRY_ON_TRANSIENT_ERROR
+    call_counter = 0
+
+    def store_callback(table, **kwargs):
+      nonlocal call_counter
+      response = insert_response[call_counter]
+      call_counter += 1
+      return response
+
+    mock_send.side_effect = store_callback
 
     opt = StandardOptions()
-    opt.streaming = True
+    opt.streaming = streaming
+
+    # Using the bundle based direct runner to avoid pickling problems
+    # with mocks.
     with beam.Pipeline(runner='BundleBasedDirectRunner', options=opt) as p:
-      _ = (
+      bq_write_out = (
           p
           | beam.Create([{
               'columnA': 'value1'
+          }, {
+              'columnA': 'value2'
+          }, {
+              'columnA': 'value3'
           }])
           | WriteToBigQuery(
               table='project:dataset.table',
@@ -1189,8 +1624,14 @@ class 
BigQueryStreamingInsertsErrorHandling(unittest.TestCase):
                   }]
               },
               create_disposition='CREATE_NEVER',
-              method='STREAMING_INSERTS'))
-    self.assertEqual(3, mock_send.call_count)
+              method='STREAMING_INSERTS',
+              insert_retry_strategy=RetryStrategy.RETRY_ON_TRANSIENT_ERROR))
+
+      failed_values = (
+          bq_write_out[beam_bq.BigQueryWriteFn.FAILED_ROWS]
+          | beam.Map(lambda x: x[1]['columnA']))
+
+      assert_that(failed_values, equal_to(failed_rows))
 
 
 @unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
@@ -1499,76 +1940,6 @@ class 
PipelineBasedStreamingInsertTest(_TestCaseWithTempDirCleanUp):
           result)
       self.assertEqual(len(data1['colA_values']), 1)
 
-  @parameterized.expand([
-      param(retry_strategy=RetryStrategy.RETRY_ALWAYS),
-      param(retry_strategy=RetryStrategy.RETRY_NEVER),
-      param(retry_strategy=RetryStrategy.RETRY_ON_TRANSIENT_ERROR),
-  ])
-  def test_permanent_failure_in_some_rows_does_not_duplicate(
-      self, unused_sleep_mock=None, retry_strategy=None):
-    with mock.patch('time.sleep'):
-
-      def store_callback(table, **kwargs):
-        return [
-            {
-                'index': 0,
-                'errors': [{
-                    'reason': 'invalid'
-                }, {
-                    'reason': 'its bad'
-                }]
-            },
-        ]
-
-      client = mock.Mock()
-      client.insert_rows_json = mock.Mock(side_effect=store_callback)
-
-      # The expected rows to be inserted according to the insert strategy
-      if retry_strategy == RetryStrategy.RETRY_NEVER:
-        inserted_rows = ['value3', 'value5']
-      else:  # RETRY_ALWAYS and RETRY_ON_TRANSIENT_ERRORS should insert all 
rows
-        inserted_rows = ['value3', 'value5']
-
-      # Using the bundle based direct runner to avoid pickling problems
-      # with mocks.
-      with beam.Pipeline(runner='BundleBasedDirectRunner') as p:
-        bq_write_out = (
-            p
-            | beam.Create([{
-                'columnA': 'value1', 'columnB': 'value2'
-            }, {
-                'columnA': 'value3', 'columnB': 'value4'
-            }, {
-                'columnA': 'value5', 'columnB': 'value6'
-            }])
-            | _StreamToBigQuery(
-                table_reference='project:dataset.table',
-                table_side_inputs=[],
-                schema_side_inputs=[],
-                schema='anyschema',
-                batch_size=None,
-                triggering_frequency=None,
-                create_disposition='CREATE_NEVER',
-                write_disposition=None,
-                kms_key=None,
-                retry_strategy=retry_strategy,
-                additional_bq_parameters=[],
-                ignore_insert_ids=False,
-                ignore_unknown_columns=False,
-                with_auto_sharding=False,
-                test_client=client,
-                max_retries=10,
-                num_streaming_keys=500))
-
-        failed_values = (
-            bq_write_out[beam_bq.BigQueryWriteFn.FAILED_ROWS]
-            | beam.Map(lambda x: x[1]['columnA']))
-
-        assert_that(
-            failed_values,
-            equal_to(
-                list({'value1', 'value3', 
'value5'}.difference(inserted_rows))))
-
   @parameterized.expand([
       param(with_auto_sharding=False),
       param(with_auto_sharding=True),
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py 
b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
index 07d711f8fc9..2f942079528 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
@@ -732,11 +732,13 @@ class BigQueryWrapper(object):
     except (ClientError, GoogleAPICallError) as e:
       # e.code contains the numeric http status code.
       service_call_metric.call(e.code)
-      # Re-reise the exception so that we re-try appropriately.
-      raise
+      # Package exception with required fields
+      error = {'message': e.message, 'reason': e.response.reason}
+      # Add all rows to the errors list along with the error
+      errors = [{"index": i, "errors": [error]} for i, _ in enumerate(rows)]
     except HttpError as e:
       service_call_metric.call(e)
-      # Re-reise the exception so that we re-try appropriately.
+      # Re-raise the exception so that we re-try appropriately.
       raise
     finally:
       self._latency_histogram_metric.update(
@@ -1491,7 +1493,19 @@ class RetryStrategy(object):
   RETRY_NEVER = 'RETRY_NEVER'
   RETRY_ON_TRANSIENT_ERROR = 'RETRY_ON_TRANSIENT_ERROR'
 
-  _NON_TRANSIENT_ERRORS = {'invalid', 'invalidQuery', 'notImplemented'}
+  # Values below may be found in reasons provided either in an
+  # error returned by a client method or by an http response as
+  # defined in google.api_core.exceptions
+  _NON_TRANSIENT_ERRORS = {
+      'invalid',
+      'invalidQuery',
+      'notImplemented',
+      'Bad Request',
+      'Unauthorized',
+      'Forbidden',
+      'Not Found',
+      'Not Implemented',
+  }
 
   @staticmethod
   def should_retry(strategy, error_message):
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py 
b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py
index c73d3ff7e53..4b728fe7ec1 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py
@@ -379,7 +379,7 @@ class BigQueryWriteIntegrationTests(unittest.TestCase):
   def test_big_query_write_insert_errors_reporting(self):
     """
     Test that errors returned by beam.io.WriteToBigQuery
-    contain both the failed rows amd the reason for it failing.
+    contain both the failed rows and the reason for it failing.
     """
     table_name = 'python_write_table'
     table_id = '{}.{}'.format(self.dataset_id, table_name)
@@ -454,6 +454,55 @@ class BigQueryWriteIntegrationTests(unittest.TestCase):
           | 'ParseErrors' >> beam.Map(lambda err: (err[1], err[2])),
           equal_to(bq_result_errors))
 
+  @pytest.mark.it_postcommit
+  def test_big_query_write_insert_non_transient_api_call_error(self):
+    """
+    Test that non-transient GoogleAPICallError errors returned 
+    by beam.io.WriteToBigQuery are not retried and result in
+    FAILED_ROWS containing both the failed rows and the reason
+    for failure.
+    """
+    table_name = 'this_table_does_not_exist'
+    table_id = '{}.{}'.format(self.dataset_id, table_name)
+
+    input_data = [{
+        'number': 1,
+        'str': 'some_string',
+    }]
+
+    table_schema = {
+        "fields": [{
+            "name": "number", "type": "INTEGER", 'mode': 'NULLABLE'
+        }, {
+            "name": "str", "type": "STRING", 'mode': 'NULLABLE'
+        }]
+    }
+
+    bq_result_errors = [({
+        'number': 1,
+        'str': 'some_string',
+    }, "Not Found")]
+
+    args = self.test_pipeline.get_full_options_as_args()
+
+    with beam.Pipeline(argv=args) as p:
+      # pylint: disable=expression-not-assigned
+      errors = (
+          p | 'create' >> beam.Create(input_data)
+          | 'write' >> beam.io.WriteToBigQuery(
+              table_id,
+              schema=table_schema,
+              method='STREAMING_INSERTS',
+              insert_retry_strategy='RETRY_ON_TRANSIENT_ERROR',
+              create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
+              write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))
+
+      assert_that(
+          errors[BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS]
+          |
+          'ParseErrors' >> beam.Map(lambda err: (err[1], err[2][0]["reason"])),
+          equal_to(bq_result_errors))
+
   @pytest.mark.it_postcommit
   @parameterized.expand([
       param(file_format=FileFormat.AVRO),


Reply via email to