This is an automated email from the ASF dual-hosted git repository.

johncasey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 725a2d62b27 Feature/add retry to gcp auth (#28475)
725a2d62b27 is described below

commit 725a2d62b27f657a0f5ab8d279508753de0908d1
Author: johnjcasey <[email protected]>
AuthorDate: Wed Sep 27 11:14:06 2023 -0400

    Feature/add retry to gcp auth (#28475)
    
    * Update 2.50 release notes to include new Kafka topicPattern feature
    
    * Create groovy class for io performance tests
    Create gradle task and github actions config for GCS using this.
    
    * delete unnecessary class
    
    * fix env call
    
    * fix call to gradle
    
    * run on hosted runner for testing
    
    * add additional checkout
    
    * add destination for triggered tests
    
    * move env variables to correct location
    
    * try uploading against separate dataset
    
    * try without a user
    
    * update branch checkout, try to view the failure log
    
    * run on failure
    
    * update to use correct BigQuery instance
    
    * convert to matrix
    
    * add result reporting
    
    * add failure clause
    
    * remove failure clause, update to run on self-hosted
    
    * address comments, clean up build
    
    * clarify branching
    
    * Update auth to retry getting credentials from GCE
    
    * Re-order imports
    
    * Add test case
    
    * Update exception log
    
    * Add failure test
    
    * Update removal of retrying method
    
    * rework via mock
    
    * Clear credentials cache for idempotent tests
    
    * Remove handler after test
    Change retry timeout to facilitate shorter retrys for anonymous access cases
    
    * Change retry timeout to facilitate shorter retrys for anonymous access 
cases
    
    * reset credentials before and after test
---
 sdks/python/apache_beam/internal/gcp/auth.py      |  14 ++-
 sdks/python/apache_beam/internal/gcp/auth_test.py | 135 ++++++++++++++++++++++
 2 files changed, 146 insertions(+), 3 deletions(-)

diff --git a/sdks/python/apache_beam/internal/gcp/auth.py 
b/sdks/python/apache_beam/internal/gcp/auth.py
index 7e54ba0a4ba..b2fda2c6e89 100644
--- a/sdks/python/apache_beam/internal/gcp/auth.py
+++ b/sdks/python/apache_beam/internal/gcp/auth.py
@@ -26,6 +26,7 @@ from typing import Optional
 
 from apache_beam.options.pipeline_options import GoogleCloudOptions
 from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.utils import retry
 
 # google.auth is only available when Beam is installed with the gcp extra.
 try:
@@ -152,8 +153,7 @@ class _Credentials(object):
 
     try:
       # pylint: disable=c-extension-no-member
-      credentials, _ = google.auth.default(
-          scopes=pipeline_options.view_as(GoogleCloudOptions).gcp_oauth_scopes)
+      credentials = _Credentials._get_credentials_with_retrys(pipeline_options)
       credentials = _Credentials._add_impersonation_credentials(
           credentials, pipeline_options)
       credentials = _ApitoolsCredentialsAdapter(credentials)
@@ -164,10 +164,18 @@ class _Credentials(object):
     except Exception as e:
       _LOGGER.warning(
           'Unable to find default credentials to use: %s\n'
-          'Connecting anonymously.',
+          'Connecting anonymously. This is expected if no '
+          'credentials are needed to access GCP resources.',
           e)
       return None
 
+  @staticmethod
+  @retry.with_exponential_backoff(num_retries=4, initial_delay_secs=2)
+  def _get_credentials_with_retrys(pipeline_options):
+    credentials, _ = google.auth.default(
+      scopes=pipeline_options.view_as(GoogleCloudOptions).gcp_oauth_scopes)
+    return credentials
+
   @staticmethod
   def _add_impersonation_credentials(credentials, pipeline_options):
     gcs_options = pipeline_options.view_as(GoogleCloudOptions)
diff --git a/sdks/python/apache_beam/internal/gcp/auth_test.py 
b/sdks/python/apache_beam/internal/gcp/auth_test.py
new file mode 100644
index 00000000000..98fb828875b
--- /dev/null
+++ b/sdks/python/apache_beam/internal/gcp/auth_test.py
@@ -0,0 +1,135 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import logging
+import unittest
+
+import mock
+
+from apache_beam.internal.gcp import auth
+from apache_beam.options.pipeline_options import GoogleCloudOptions
+from apache_beam.options.pipeline_options import PipelineOptions
+
+try:
+  import google.auth as gauth
+except ImportError:
+  gauth = None
+
+
+class MockLoggingHandler(logging.Handler):
+  """Mock logging handler to check for expected logs."""
+  def __init__(self, *args, **kwargs):
+    self.reset()
+    logging.Handler.__init__(self, *args, **kwargs)
+
+  def emit(self, record):
+    self.messages[record.levelname.lower()].append(record.getMessage())
+
+  def reset(self):
+    self.messages = {
+        'debug': [],
+        'info': [],
+        'warning': [],
+        'error': [],
+        'critical': [],
+    }
+
+
[email protected](gauth is None, 'Google Auth dependencies are not installed')
+class AuthTest(unittest.TestCase):
+  @mock.patch('google.auth.default')
+  def test_auth_with_retrys(self, unused_mock_arg):
+    pipeline_options = PipelineOptions()
+    pipeline_options.view_as(
+        GoogleCloudOptions).impersonate_service_account = False
+
+    credentials = ('creds', 1)
+
+    self.is_called = False
+
+    def side_effect(scopes=None):
+      if self.is_called:
+        return credentials
+      else:
+        self.is_called = True
+        raise IOError('Failed')
+
+    google_auth_mock = mock.MagicMock()
+    gauth.default = google_auth_mock
+    google_auth_mock.side_effect = side_effect
+
+    # _Credentials caches the actual credentials.
+    # This resets it for idempotent tests.
+    if auth._Credentials._credentials_init:
+      auth._Credentials._credentials_init = False
+      auth._Credentials._credentials = None
+
+    returned_credentials = auth.get_service_credentials(pipeline_options)
+
+    # _Credentials caches the actual credentials.
+    # This resets it for idempotent tests.
+    if auth._Credentials._credentials_init:
+      auth._Credentials._credentials_init = False
+      auth._Credentials._credentials = None
+
+    self.assertEqual('creds', returned_credentials._google_auth_credentials)
+
+  @mock.patch(
+      
'apache_beam.internal.gcp.auth._Credentials._get_credentials_with_retrys')
+  def test_auth_with_retrys_always_fail(self, unused_mock_arg):
+    pipeline_options = PipelineOptions()
+    pipeline_options.view_as(
+        GoogleCloudOptions).impersonate_service_account = False
+
+    loggerHandler = MockLoggingHandler()
+
+    auth._LOGGER.addHandler(loggerHandler)
+
+    #Remove call to retrying method, as otherwise test takes ~10 minutes to run
+    def raise_(scopes=None):
+      raise IOError('Failed')
+
+    retry_auth_mock = mock.MagicMock()
+    auth._Credentials._get_credentials_with_retrys = retry_auth_mock
+    retry_auth_mock.side_effect = raise_
+
+    # _Credentials caches the actual credentials.
+    # This resets it for idempotent tests.
+    if auth._Credentials._credentials_init:
+      auth._Credentials._credentials_init = False
+      auth._Credentials._credentials = None
+
+    returned_credentials = auth.get_service_credentials(pipeline_options)
+
+    self.assertEqual(None, returned_credentials)
+    self.assertEqual([
+        'Unable to find default credentials to use: Failed\n'
+        'Connecting anonymously. This is expected if no credentials are '
+        'needed to access GCP resources.'
+    ],
+                     loggerHandler.messages.get('warning'))
+
+    # _Credentials caches the actual credentials.
+    # This resets it for idempotent tests.
+    if auth._Credentials._credentials_init:
+      auth._Credentials._credentials_init = False
+      auth._Credentials._credentials = None
+
+    auth._LOGGER.removeHandler(loggerHandler)
+
+
+if __name__ == '__main__':
+  unittest.main()

Reply via email to