ahmedabu98 commented on code in PR #28091:
URL: https://github.com/apache/beam/pull/28091#discussion_r1327144446


##########
sdks/python/apache_beam/io/gcp/bigquery_test.py:
##########
@@ -931,255 +929,583 @@ def test_copy_load_job_exception(self, exception_type, 
error_message):
     'GCP dependencies are not installed')
 class BigQueryStreamingInsertsErrorHandling(unittest.TestCase):
 
-  # Using https://cloud.google.com/bigquery/docs/error-messages and
-  # https://googleapis.dev/python/google-api-core/latest/_modules/google
-  #    /api_core/exceptions.html
-  # to determine error types and messages to try for retriables.
+  # Running tests with a variety of exceptions from  https://googleapis.dev
+  #     
/python/google-api-core/latest/_modules/google/api_core/exceptions.html.
+  # Choosing some exceptions that produce reasons included in
+  # bigquery_tools._NON_TRANSIENT_ERRORS and some that are not
   @parameterized.expand([
+      # reason not in _NON_TRANSIENT_ERRORS for row 1 on first attempt
+      # transient error retried and succeeds on second attempt, 0 rows sent to
+      # failed rows
       param(
-          exception_type=exceptions.Forbidden if exceptions else None,
-          error_reason='rateLimitExceeded'),
-      param(
-          exception_type=exceptions.DeadlineExceeded if exceptions else None,
-          error_reason='somereason'),
+          insert_response=[
+            exceptions.TooManyRequests if exceptions else None, None],
+          error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS
+          failed_rows=[]),
+      # reason not in _NON_TRANSIENT_ERRORS for row 1 on both attempts, sent to
+      # failed rows after hitting max_retries
       param(
-          exception_type=exceptions.ServiceUnavailable if exceptions else None,
-          error_reason='backendError'),
-      param(
-          exception_type=exceptions.InternalServerError if exceptions else 
None,
-          error_reason='internalError'),
+          insert_response=[
+            exceptions.TooManyRequests if exceptions else None,
+            exceptions.TooManyRequests if exceptions else None],
+          error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS
+          failed_rows=['value1', 'value3', 'value5']),
+      # reason in _NON_TRANSIENT_ERRORS for row 1 on both attempts, sent to
+      # failed_rows after hitting max_retries
       param(
-          exception_type=exceptions.InternalServerError if exceptions else 
None,
-          error_reason='backendError'),
+          insert_response=[
+            exceptions.Forbidden if exceptions else None,
+            exceptions.Forbidden if exceptions else None],
+          error_reason='Forbidden', # in _NON_TRANSIENT_ERRORS
+          failed_rows=['value1', 'value3', 'value5']),
   ])
-  @mock.patch('time.sleep')
-  @mock.patch('google.cloud.bigquery.Client.insert_rows_json')
-  def test_insert_all_retries_if_structured_retriable(
-      self,
-      mock_send,
-      unused_mock_sleep,
-      exception_type=None,
-      error_reason=None):
-    # In this test, a BATCH pipeline will retry the known RETRIABLE errors.
-    mock_send.side_effect = [
-        exception_type(
-            'some retriable exception', errors=[{
-                'reason': error_reason
-            }]),
-        exception_type(
-            'some retriable exception', errors=[{
-                'reason': error_reason
-            }]),
-        exception_type(
-            'some retriable exception', errors=[{
-                'reason': error_reason
-            }]),
-        exception_type(
-            'some retriable exception', errors=[{
-                'reason': error_reason
-            }]),
-    ]
+  def test_insert_rows_json_exception_retry_always(
+      self, insert_response, error_reason, failed_rows):
+    # In this test, a pipeline will always retry all caught exception types
+    # since RetryStrategy is not set and defaults to RETRY_ALWAYS
+    with mock.patch('time.sleep'):
+      call_counter = 0
+      mock_response = mock.Mock()
+      mock_response.reason = error_reason
 
-    with self.assertRaises(Exception) as exc:
-      with beam.Pipeline() as p:
-        _ = (
+      def store_callback(table, **kwargs):
+        nonlocal call_counter
+        # raise exception if insert_response element is an exception
+        if insert_response[call_counter]:
+          exception_type = insert_response[call_counter]
+          call_counter += 1
+          raise exception_type('some exception', response=mock_response)
+        # return empty list if not insert_response element, indicating
+        # successful call to insert_rows_json
+        else:
+          call_counter += 1
+          return []
+
+      client = mock.Mock()
+      client.insert_rows_json.side_effect = store_callback
+
+      # Using the bundle based direct runner to avoid pickling problems
+      # with mocks.
+      with beam.Pipeline(runner='BundleBasedDirectRunner') as p:
+        bq_write_out = (
             p
             | beam.Create([{
-                'columnA': 'value1'
+                'columnA': 'value1', 'columnB': 'value2'
+            }, {
+                'columnA': 'value3', 'columnB': 'value4'
+            }, {
+                'columnA': 'value5', 'columnB': 'value6'
             }])
-            | WriteToBigQuery(
-                table='project:dataset.table',
-                schema={
-                    'fields': [{
-                        'name': 'columnA', 'type': 'STRING', 'mode': 'NULLABLE'
-                    }]
-                },
+            # Using _StreamToBigQuery in order to be able to pass max_retries
+            # in order to limit run time of test with RETRY_ALWAYS
+            | _StreamToBigQuery(
+                table_reference='project:dataset.table',
+                table_side_inputs=[],
+                schema_side_inputs=[],
+                schema='anyschema',
+                batch_size=None,
+                triggering_frequency=None,
                 create_disposition='CREATE_NEVER',
-                method='STREAMING_INSERTS'))
-    self.assertEqual(4, mock_send.call_count)
-    self.assertIn('some retriable exception', exc.exception.args[0])
+                write_disposition=None,
+                kms_key=None,
+                retry_strategy=RetryStrategy.RETRY_ALWAYS,
+                additional_bq_parameters=[],
+                ignore_insert_ids=False,
+                ignore_unknown_columns=False,
+                with_auto_sharding=False,
+                test_client=client,
+                max_retries=len(insert_response) - 1,
+                num_streaming_keys=500))
+
+        failed_values = (
+            bq_write_out[beam_bq.BigQueryWriteFn.FAILED_ROWS]
+            | beam.Map(lambda x: x[1]['columnA']))
+
+        assert_that(failed_values, equal_to(failed_rows))
 
-  # Using https://googleapis.dev/python/google-api-core/latest/_modules/google
-  #   /api_core/exceptions.html
-  # to determine error types and messages to try for retriables.
+  # Running tests with a variety of exceptions from  https://googleapis.dev
+  #     
/python/google-api-core/latest/_modules/google/api_core/exceptions.html.
+  # Choosing some exceptions that produce reasons that are included in
+  # bigquery_tools._NON_TRANSIENT_ERRORS and some that are not
   @parameterized.expand([
       param(
-          exception_type=requests.exceptions.ConnectionError,
-          error_message='some connection error'),
+          exception_type=exceptions.TooManyRequests if exceptions else None,
+          error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS
+          streaming=False),
       param(
-          exception_type=requests.exceptions.Timeout,
-          error_message='some timeout error'),
+          exception_type=exceptions.Forbidden if exceptions else None,
+          error_reason='Forbidden', # in _NON_TRANSIENT_ERRORS
+          streaming=False),
       param(
-          exception_type=ConnectionError,
-          error_message='some py connection error'),
+          exception_type=exceptions.TooManyRequests if exceptions else None,
+          error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS
+          streaming=True),
       param(
-          exception_type=exceptions.BadGateway if exceptions else None,
-          error_message='some badgateway error'),

Review Comment:
   You're certainly right that fundamentally we should be focusing on those two 
code paths. I agree with not bringing those tests back, but maybe the 
exceptions themselves should be spread out across your tests. For example, we 
can replace one of the many `exceptions.TooManyRequests` in your test cases 
with  `exceptions.Timeout`, and in another test case replace it with 
`exceptions.BadGateway` etc. So the tests you wrote can stay where they are, 
but just the parameters can change to include a wider variety of exceptions.
   
   Although these exceptions ultimately take the same code path in the end, the 
variety of exceptions tested can guard against future unwanted changes to 
`_NON_TRANSIENT_ERRORS`. (e.g. someone making `Timeout` a non-transient error)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to