ajdub508 commented on code in PR #28091:
URL: https://github.com/apache/beam/pull/28091#discussion_r1327119144


##########
sdks/python/apache_beam/io/gcp/bigquery_test.py:
##########
@@ -931,255 +929,583 @@ def test_copy_load_job_exception(self, exception_type, 
error_message):
     'GCP dependencies are not installed')
 class BigQueryStreamingInsertsErrorHandling(unittest.TestCase):
 
-  # Using https://cloud.google.com/bigquery/docs/error-messages and
-  # https://googleapis.dev/python/google-api-core/latest/_modules/google
-  #    /api_core/exceptions.html
-  # to determine error types and messages to try for retriables.
+  # Running tests with a variety of exceptions from  https://googleapis.dev
+  #     
/python/google-api-core/latest/_modules/google/api_core/exceptions.html.
+  # Choosing some exceptions that produce reasons included in
+  # bigquery_tools._NON_TRANSIENT_ERRORS and some that are not
   @parameterized.expand([
+      # reason not in _NON_TRANSIENT_ERRORS for row 1 on first attempt
+      # transient error retried and succeeds on second attempt, 0 rows sent to
+      # failed rows
       param(
-          exception_type=exceptions.Forbidden if exceptions else None,
-          error_reason='rateLimitExceeded'),
-      param(
-          exception_type=exceptions.DeadlineExceeded if exceptions else None,
-          error_reason='somereason'),
+          insert_response=[
+            exceptions.TooManyRequests if exceptions else None, None],
+          error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS
+          failed_rows=[]),
+      # reason not in _NON_TRANSIENT_ERRORS for row 1 on both attempts, sent to
+      # failed rows after hitting max_retries
       param(
-          exception_type=exceptions.ServiceUnavailable if exceptions else None,
-          error_reason='backendError'),
-      param(
-          exception_type=exceptions.InternalServerError if exceptions else 
None,
-          error_reason='internalError'),
+          insert_response=[
+            exceptions.TooManyRequests if exceptions else None,
+            exceptions.TooManyRequests if exceptions else None],
+          error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS
+          failed_rows=['value1', 'value3', 'value5']),
+      # reason in _NON_TRANSIENT_ERRORS for row 1 on both attempts, sent to
+      # failed_rows after hitting max_retries
       param(
-          exception_type=exceptions.InternalServerError if exceptions else 
None,
-          error_reason='backendError'),
+          insert_response=[
+            exceptions.Forbidden if exceptions else None,
+            exceptions.Forbidden if exceptions else None],
+          error_reason='Forbidden', # in _NON_TRANSIENT_ERRORS
+          failed_rows=['value1', 'value3', 'value5']),
   ])
-  @mock.patch('time.sleep')
-  @mock.patch('google.cloud.bigquery.Client.insert_rows_json')
-  def test_insert_all_retries_if_structured_retriable(
-      self,
-      mock_send,
-      unused_mock_sleep,
-      exception_type=None,
-      error_reason=None):
-    # In this test, a BATCH pipeline will retry the known RETRIABLE errors.
-    mock_send.side_effect = [
-        exception_type(
-            'some retriable exception', errors=[{
-                'reason': error_reason
-            }]),
-        exception_type(
-            'some retriable exception', errors=[{
-                'reason': error_reason
-            }]),
-        exception_type(
-            'some retriable exception', errors=[{
-                'reason': error_reason
-            }]),
-        exception_type(
-            'some retriable exception', errors=[{
-                'reason': error_reason
-            }]),
-    ]
+  def test_insert_rows_json_exception_retry_always(
+      self, insert_response, error_reason, failed_rows):
+    # In this test, a pipeline will always retry all caught exception types
+    # since RetryStrategy is not set and defaults to RETRY_ALWAYS
+    with mock.patch('time.sleep'):
+      call_counter = 0
+      mock_response = mock.Mock()
+      mock_response.reason = error_reason
 
-    with self.assertRaises(Exception) as exc:
-      with beam.Pipeline() as p:
-        _ = (
+      def store_callback(table, **kwargs):
+        nonlocal call_counter
+        # raise exception if insert_response element is an exception
+        if insert_response[call_counter]:
+          exception_type = insert_response[call_counter]
+          call_counter += 1
+          raise exception_type('some exception', response=mock_response)
+        # return empty list if not insert_response element, indicating
+        # successful call to insert_rows_json
+        else:
+          call_counter += 1
+          return []
+
+      client = mock.Mock()
+      client.insert_rows_json.side_effect = store_callback
+
+      # Using the bundle based direct runner to avoid pickling problems
+      # with mocks.
+      with beam.Pipeline(runner='BundleBasedDirectRunner') as p:
+        bq_write_out = (
             p
             | beam.Create([{
-                'columnA': 'value1'
+                'columnA': 'value1', 'columnB': 'value2'
+            }, {
+                'columnA': 'value3', 'columnB': 'value4'
+            }, {
+                'columnA': 'value5', 'columnB': 'value6'
             }])
-            | WriteToBigQuery(
-                table='project:dataset.table',
-                schema={
-                    'fields': [{
-                        'name': 'columnA', 'type': 'STRING', 'mode': 'NULLABLE'
-                    }]
-                },
+            # Using _StreamToBigQuery in order to be able to pass max_retries
+            # in order to limit run time of test with RETRY_ALWAYS
+            | _StreamToBigQuery(
+                table_reference='project:dataset.table',
+                table_side_inputs=[],
+                schema_side_inputs=[],
+                schema='anyschema',
+                batch_size=None,
+                triggering_frequency=None,
                 create_disposition='CREATE_NEVER',
-                method='STREAMING_INSERTS'))
-    self.assertEqual(4, mock_send.call_count)
-    self.assertIn('some retriable exception', exc.exception.args[0])
+                write_disposition=None,
+                kms_key=None,
+                retry_strategy=RetryStrategy.RETRY_ALWAYS,
+                additional_bq_parameters=[],
+                ignore_insert_ids=False,
+                ignore_unknown_columns=False,
+                with_auto_sharding=False,
+                test_client=client,
+                max_retries=len(insert_response) - 1,
+                num_streaming_keys=500))
+
+        failed_values = (
+            bq_write_out[beam_bq.BigQueryWriteFn.FAILED_ROWS]
+            | beam.Map(lambda x: x[1]['columnA']))
+
+        assert_that(failed_values, equal_to(failed_rows))
 
-  # Using https://googleapis.dev/python/google-api-core/latest/_modules/google
-  #   /api_core/exceptions.html
-  # to determine error types and messages to try for retriables.
+  # Running tests with a variety of exceptions from  https://googleapis.dev
+  #     
/python/google-api-core/latest/_modules/google/api_core/exceptions.html.
+  # Choosing some exceptions that produce reasons that are included in
+  # bigquery_tools._NON_TRANSIENT_ERRORS and some that are not
   @parameterized.expand([
       param(
-          exception_type=requests.exceptions.ConnectionError,
-          error_message='some connection error'),
+          exception_type=exceptions.TooManyRequests if exceptions else None,
+          error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS
+          streaming=False),
       param(
-          exception_type=requests.exceptions.Timeout,
-          error_message='some timeout error'),
+          exception_type=exceptions.Forbidden if exceptions else None,
+          error_reason='Forbidden', # in _NON_TRANSIENT_ERRORS
+          streaming=False),
       param(
-          exception_type=ConnectionError,
-          error_message='some py connection error'),
+          exception_type=exceptions.TooManyRequests if exceptions else None,
+          error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS
+          streaming=True),
       param(
-          exception_type=exceptions.BadGateway if exceptions else None,
-          error_message='some badgateway error'),

Review Comment:
   Looking closer at this - I can add these back, but I still think they might 
be redundant. I think the tests in this change cover the possible code paths. 
This is my line of thinking, let me know where I'm off track:
   - All of the existing exception tests are mocking 
`google.cloud.bigquery.Client.insert_rows_json`
   - There 2 scenarios we need to handle when calling that method with regard 
to errors/exceptions:
     - the method returns an error list response 
[here](https://github.com/apache/beam/blob/dbb657371ca6a23b8cd1216c10686a144439c91e/sdks/python/apache_beam/io/gcp/bigquery_tools.py#L720)
     - the method raises a `GoogleAPICallError` exception 
[here](https://github.com/apache/beam/blob/dbb657371ca6a23b8cd1216c10686a144439c91e/sdks/python/apache_beam/io/gcp/bigquery_tools.py#L732)
   
   In each of those scenarios, a different path can be taken based on whether 
the reason is in `_NON_TRANSIENT_ERRORS`, so the tests in this change exercise 
both transient/non-transient to cover both paths.
   
   The tests are re-organized in this change to try to make it clear which path 
is being tested, with `test_insert_rows_json_exception_*` tests for the first 
scenario and `test_insert_rows_json_errors_*` tests for the 2nd scenario. Also 
exercising retry strategies and transient/non-transient errors for each 
scenario to cover those paths for each scenario.
   
   I don't mean to be a pain on this and certainly may be missing something, so 
I will defer to project committers on this, just wanted to see if I can 
understand what I'm missing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to