This is an automated email from the ASF dual-hosted git repository.
johncasey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 725a2d62b27 Feature/add retry to gcp auth (#28475)
725a2d62b27 is described below
commit 725a2d62b27f657a0f5ab8d279508753de0908d1
Author: johnjcasey <[email protected]>
AuthorDate: Wed Sep 27 11:14:06 2023 -0400
Feature/add retry to gcp auth (#28475)
* Update 2.50 release notes to include new Kafka topicPattern feature
* Create groovy class for io performance tests
Create gradle task and github actions config for GCS using this.
* delete unnecessary class
* fix env call
* fix call to gradle
* run on hosted runner for testing
* add additional checkout
* add destination for triggered tests
* move env variables to correct location
* try uploading against separate dataset
* try without a user
* update branch checkout, try to view the failure log
* run on failure
* update to use correct BigQuery instance
* convert to matrix
* add result reporting
* add failure clause
* remove failure clause, update to run on self-hosted
* address comments, clean up build
* clarify branching
* Update auth to retry getting credentials from GCE
* Re-order imports
* Add test case
* Update exception log
* Add failure test
* Update removal of retrying method
* rework via mock
* Clear credentials cache for idempotent tests
* Remove handler after test
Change retry timeout to facilitate shorter retrys for anonymous access cases
* Change retry timeout to facilitate shorter retrys for anonymous access
cases
* reset credentials before and after test
---
sdks/python/apache_beam/internal/gcp/auth.py | 14 ++-
sdks/python/apache_beam/internal/gcp/auth_test.py | 135 ++++++++++++++++++++++
2 files changed, 146 insertions(+), 3 deletions(-)
diff --git a/sdks/python/apache_beam/internal/gcp/auth.py
b/sdks/python/apache_beam/internal/gcp/auth.py
index 7e54ba0a4ba..b2fda2c6e89 100644
--- a/sdks/python/apache_beam/internal/gcp/auth.py
+++ b/sdks/python/apache_beam/internal/gcp/auth.py
@@ -26,6 +26,7 @@ from typing import Optional
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.utils import retry
# google.auth is only available when Beam is installed with the gcp extra.
try:
@@ -152,8 +153,7 @@ class _Credentials(object):
try:
# pylint: disable=c-extension-no-member
- credentials, _ = google.auth.default(
- scopes=pipeline_options.view_as(GoogleCloudOptions).gcp_oauth_scopes)
+ credentials = _Credentials._get_credentials_with_retrys(pipeline_options)
credentials = _Credentials._add_impersonation_credentials(
credentials, pipeline_options)
credentials = _ApitoolsCredentialsAdapter(credentials)
@@ -164,10 +164,18 @@ class _Credentials(object):
except Exception as e:
_LOGGER.warning(
'Unable to find default credentials to use: %s\n'
- 'Connecting anonymously.',
+ 'Connecting anonymously. This is expected if no '
+ 'credentials are needed to access GCP resources.',
e)
return None
+ @staticmethod
+ @retry.with_exponential_backoff(num_retries=4, initial_delay_secs=2)
+ def _get_credentials_with_retrys(pipeline_options):
+ credentials, _ = google.auth.default(
+ scopes=pipeline_options.view_as(GoogleCloudOptions).gcp_oauth_scopes)
+ return credentials
+
@staticmethod
def _add_impersonation_credentials(credentials, pipeline_options):
gcs_options = pipeline_options.view_as(GoogleCloudOptions)
diff --git a/sdks/python/apache_beam/internal/gcp/auth_test.py
b/sdks/python/apache_beam/internal/gcp/auth_test.py
new file mode 100644
index 00000000000..98fb828875b
--- /dev/null
+++ b/sdks/python/apache_beam/internal/gcp/auth_test.py
@@ -0,0 +1,135 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import logging
+import unittest
+
+import mock
+
+from apache_beam.internal.gcp import auth
+from apache_beam.options.pipeline_options import GoogleCloudOptions
+from apache_beam.options.pipeline_options import PipelineOptions
+
+try:
+ import google.auth as gauth
+except ImportError:
+ gauth = None
+
+
+class MockLoggingHandler(logging.Handler):
+ """Mock logging handler to check for expected logs."""
+ def __init__(self, *args, **kwargs):
+ self.reset()
+ logging.Handler.__init__(self, *args, **kwargs)
+
+ def emit(self, record):
+ self.messages[record.levelname.lower()].append(record.getMessage())
+
+ def reset(self):
+ self.messages = {
+ 'debug': [],
+ 'info': [],
+ 'warning': [],
+ 'error': [],
+ 'critical': [],
+ }
+
+
[email protected](gauth is None, 'Google Auth dependencies are not installed')
+class AuthTest(unittest.TestCase):
+ @mock.patch('google.auth.default')
+ def test_auth_with_retrys(self, unused_mock_arg):
+ pipeline_options = PipelineOptions()
+ pipeline_options.view_as(
+ GoogleCloudOptions).impersonate_service_account = False
+
+ credentials = ('creds', 1)
+
+ self.is_called = False
+
+ def side_effect(scopes=None):
+ if self.is_called:
+ return credentials
+ else:
+ self.is_called = True
+ raise IOError('Failed')
+
+ google_auth_mock = mock.MagicMock()
+ gauth.default = google_auth_mock
+ google_auth_mock.side_effect = side_effect
+
+ # _Credentials caches the actual credentials.
+ # This resets it for idempotent tests.
+ if auth._Credentials._credentials_init:
+ auth._Credentials._credentials_init = False
+ auth._Credentials._credentials = None
+
+ returned_credentials = auth.get_service_credentials(pipeline_options)
+
+ # _Credentials caches the actual credentials.
+ # This resets it for idempotent tests.
+ if auth._Credentials._credentials_init:
+ auth._Credentials._credentials_init = False
+ auth._Credentials._credentials = None
+
+ self.assertEqual('creds', returned_credentials._google_auth_credentials)
+
+ @mock.patch(
+
'apache_beam.internal.gcp.auth._Credentials._get_credentials_with_retrys')
+ def test_auth_with_retrys_always_fail(self, unused_mock_arg):
+ pipeline_options = PipelineOptions()
+ pipeline_options.view_as(
+ GoogleCloudOptions).impersonate_service_account = False
+
+ loggerHandler = MockLoggingHandler()
+
+ auth._LOGGER.addHandler(loggerHandler)
+
+ #Remove call to retrying method, as otherwise test takes ~10 minutes to run
+ def raise_(scopes=None):
+ raise IOError('Failed')
+
+ retry_auth_mock = mock.MagicMock()
+ auth._Credentials._get_credentials_with_retrys = retry_auth_mock
+ retry_auth_mock.side_effect = raise_
+
+ # _Credentials caches the actual credentials.
+ # This resets it for idempotent tests.
+ if auth._Credentials._credentials_init:
+ auth._Credentials._credentials_init = False
+ auth._Credentials._credentials = None
+
+ returned_credentials = auth.get_service_credentials(pipeline_options)
+
+ self.assertEqual(None, returned_credentials)
+ self.assertEqual([
+ 'Unable to find default credentials to use: Failed\n'
+ 'Connecting anonymously. This is expected if no credentials are '
+ 'needed to access GCP resources.'
+ ],
+ loggerHandler.messages.get('warning'))
+
+ # _Credentials caches the actual credentials.
+ # This resets it for idempotent tests.
+ if auth._Credentials._credentials_init:
+ auth._Credentials._credentials_init = False
+ auth._Credentials._credentials = None
+
+ auth._LOGGER.removeHandler(loggerHandler)
+
+
+if __name__ == '__main__':
+ unittest.main()