This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 27b3a22e34 Introduce anonymous credentials in GCP base hook (#39695)
27b3a22e34 is described below
commit 27b3a22e341468855c4ef368015ad946a59aa2e3
Author: Shahar Epstein <[email protected]>
AuthorDate: Mon May 20 00:20:38 2024 +0300
Introduce anonymous credentials in GCP base hook (#39695)
---
.../google/cloud/utils/credentials_provider.py | 73 ++++++++++++----------
.../providers/google/common/hooks/base_google.py | 16 +++--
.../cloud/utils/test_credentials_provider.py | 12 ++--
.../google/common/hooks/test_base_google.py | 32 ++++++++--
4 files changed, 84 insertions(+), 49 deletions(-)
diff --git a/airflow/providers/google/cloud/utils/credentials_provider.py
b/airflow/providers/google/cloud/utils/credentials_provider.py
index 06f2b5c4d8..b3140c2da0 100644
--- a/airflow/providers/google/cloud/utils/credentials_provider.py
+++ b/airflow/providers/google/cloud/utils/credentials_provider.py
@@ -28,9 +28,9 @@ from typing import Collection, Generator, Sequence
from urllib.parse import urlencode
import google.auth
-import google.auth.credentials
import google.oauth2.service_account
from google.auth import impersonated_credentials # type: ignore[attr-defined]
+from google.auth.credentials import AnonymousCredentials, Credentials
from google.auth.environment_vars import CREDENTIALS, LEGACY_PROJECT, PROJECT
from airflow.exceptions import AirflowException
@@ -178,6 +178,7 @@ class _CredentialProvider(LoggingMixin):
:param key_secret_name: Keyfile Secret Name in GCP Secret Manager.
:param key_secret_project_id: Project ID to read the secrets from. If not
passed, the project ID from
default credentials will be used.
+ :param credential_config_file: File path to or content of a GCP credential
configuration file.
:param scopes: OAuth scopes for the connection
:param delegate_to: The account to impersonate using domain-wide
delegation of authority,
if any. For this to work, the service account making the request must
have
@@ -192,6 +193,8 @@ class _CredentialProvider(LoggingMixin):
Service Account Token Creator IAM role to the directly preceding
identity, with first
account from the list granting this role to the originating account
and target_principal
granting the role to the last account from the list.
+ :param is_anonymous: Provides an anonymous set of credentials,
+ which is useful for APIs which do not require authentication.
"""
def __init__(
@@ -206,13 +209,14 @@ class _CredentialProvider(LoggingMixin):
disable_logging: bool = False,
target_principal: str | None = None,
delegates: Sequence[str] | None = None,
+ is_anonymous: bool | None = None,
) -> None:
super().__init__()
- key_options = [key_path, key_secret_name, keyfile_dict]
+ key_options = [key_path, keyfile_dict, credential_config_file,
key_secret_name, is_anonymous]
if len([x for x in key_options if x]) > 1:
raise AirflowException(
- "The `keyfile_dict`, `key_path`, and `key_secret_name` fields "
- "are all mutually exclusive. Please provide only one value."
+ "The `keyfile_dict`, `key_path`, `credential_config_file`,
`is_anonymous` and"
+ " `key_secret_name` fields are all mutually exclusive. Please
provide only one value."
)
self.key_path = key_path
self.keyfile_dict = keyfile_dict
@@ -224,43 +228,48 @@ class _CredentialProvider(LoggingMixin):
self.disable_logging = disable_logging
self.target_principal = target_principal
self.delegates = delegates
+ self.is_anonymous = is_anonymous
- def get_credentials_and_project(self) ->
tuple[google.auth.credentials.Credentials, str]:
+ def get_credentials_and_project(self) -> tuple[Credentials, str]:
"""
Get current credentials and project ID.
+ Project ID is an empty string when using anonymous credentials.
+
:return: Google Auth Credentials
"""
- if self.key_path:
- credentials, project_id = self._get_credentials_using_key_path()
- elif self.key_secret_name:
- credentials, project_id =
self._get_credentials_using_key_secret_name()
- elif self.keyfile_dict:
- credentials, project_id =
self._get_credentials_using_keyfile_dict()
- elif self.credential_config_file:
- credentials, project_id =
self._get_credentials_using_credential_config_file()
+ if self.is_anonymous:
+ credentials, project_id = AnonymousCredentials(), ""
else:
- credentials, project_id = self._get_credentials_using_adc()
-
- if self.delegate_to:
- if hasattr(credentials, "with_subject"):
- credentials = credentials.with_subject(self.delegate_to)
+ if self.key_path:
+ credentials, project_id =
self._get_credentials_using_key_path()
+ elif self.key_secret_name:
+ credentials, project_id =
self._get_credentials_using_key_secret_name()
+ elif self.keyfile_dict:
+ credentials, project_id =
self._get_credentials_using_keyfile_dict()
+ elif self.credential_config_file:
+ credentials, project_id =
self._get_credentials_using_credential_config_file()
else:
- raise AirflowException(
- "The `delegate_to` parameter cannot be used here as the
current "
- "authentication method does not support account
impersonate. "
- "Please use service-account for authorization."
+ credentials, project_id = self._get_credentials_using_adc()
+ if self.delegate_to:
+ if hasattr(credentials, "with_subject"):
+ credentials = credentials.with_subject(self.delegate_to)
+ else:
+ raise AirflowException(
+ "The `delegate_to` parameter cannot be used here as
the current "
+ "authentication method does not support account
impersonate. "
+ "Please use service-account for authorization."
+ )
+
+ if self.target_principal:
+ credentials = impersonated_credentials.Credentials(
+ source_credentials=credentials,
+ target_principal=self.target_principal,
+ delegates=self.delegates,
+ target_scopes=self.scopes,
)
- if self.target_principal:
- credentials = impersonated_credentials.Credentials(
- source_credentials=credentials,
- target_principal=self.target_principal,
- delegates=self.delegates,
- target_scopes=self.scopes,
- )
-
- project_id =
_get_project_id_from_service_account_email(self.target_principal)
+ project_id =
_get_project_id_from_service_account_email(self.target_principal)
return credentials, project_id
@@ -357,7 +366,7 @@ class _CredentialProvider(LoggingMixin):
self.log.debug(*args, **kwargs)
-def get_credentials_and_project_id(*args, **kwargs) ->
tuple[google.auth.credentials.Credentials, str]:
+def get_credentials_and_project_id(*args, **kwargs) -> tuple[Credentials, str]:
"""Return the Credentials object for Google API and the associated
project_id."""
return _CredentialProvider(*args, **kwargs).get_credentials_and_project()
diff --git a/airflow/providers/google/common/hooks/base_google.py
b/airflow/providers/google/common/hooks/base_google.py
index 5800f8e44c..0de39330a9 100644
--- a/airflow/providers/google/common/hooks/base_google.py
+++ b/airflow/providers/google/common/hooks/base_google.py
@@ -30,7 +30,6 @@ from subprocess import check_output
from typing import TYPE_CHECKING, Any, Callable, Generator, Sequence, TypeVar,
cast
import google.auth
-import google.auth.credentials
import google.oauth2.service_account
import google_auth_httplib2
import requests
@@ -223,7 +222,7 @@ class GoogleBaseHook(BaseHook):
"""Return connection widgets to add to connection form."""
from flask_appbuilder.fieldwidgets import BS3PasswordFieldWidget,
BS3TextFieldWidget
from flask_babel import lazy_gettext
- from wtforms import IntegerField, PasswordField, StringField
+ from wtforms import BooleanField, IntegerField, PasswordField,
StringField
from wtforms.validators import NumberRange
return {
@@ -249,6 +248,9 @@ class GoogleBaseHook(BaseHook):
"impersonation_chain": StringField(
lazy_gettext("Impersonation Chain"),
widget=BS3TextFieldWidget()
),
+ "is_anonymous": BooleanField(
+ lazy_gettext("Anonymous credentials (ignores all other
settings)"), default=False
+ ),
}
@classmethod
@@ -270,10 +272,10 @@ class GoogleBaseHook(BaseHook):
self.delegate_to = delegate_to
self.impersonation_chain = impersonation_chain
self.extras: dict = self.get_connection(self.gcp_conn_id).extra_dejson
- self._cached_credentials: google.auth.credentials.Credentials | None =
None
+ self._cached_credentials: Credentials | None = None
self._cached_project_id: str | None = None
- def get_credentials_and_project_id(self) ->
tuple[google.auth.credentials.Credentials, str | None]:
+ def get_credentials_and_project_id(self) -> tuple[Credentials, str | None]:
"""Return the Credentials object for Google API and the associated
project_id."""
if self._cached_credentials is not None:
return self._cached_credentials, self._cached_project_id
@@ -301,6 +303,7 @@ class GoogleBaseHook(BaseHook):
self.impersonation_chain = [s.strip() for s in
self.impersonation_chain.split(",")]
target_principal, delegates =
_get_target_principal_and_delegates(self.impersonation_chain)
+ is_anonymous = self._get_field("is_anonymous")
credentials, project_id = get_credentials_and_project_id(
key_path=key_path,
@@ -312,6 +315,7 @@ class GoogleBaseHook(BaseHook):
delegate_to=self.delegate_to,
target_principal=target_principal,
delegates=delegates,
+ is_anonymous=is_anonymous,
)
overridden_project_id = self._get_field("project")
@@ -323,7 +327,7 @@ class GoogleBaseHook(BaseHook):
return credentials, project_id
- def get_credentials(self) -> google.auth.credentials.Credentials:
+ def get_credentials(self) -> Credentials:
"""Return the Credentials object for Google API."""
credentials, _ = self.get_credentials_and_project_id()
return credentials
@@ -655,6 +659,8 @@ class GoogleBaseHook(BaseHook):
def test_connection(self):
"""Test the Google cloud connectivity from UI."""
status, message = False, ""
+ if self._get_field("is_anonymous"):
+ return True, "Credentials are anonymous"
try:
token = self._get_access_token()
url =
f"https://www.googleapis.com/oauth2/v3/tokeninfo?access_token={token}"
diff --git a/tests/providers/google/cloud/utils/test_credentials_provider.py
b/tests/providers/google/cloud/utils/test_credentials_provider.py
index 812d9c5cd5..2fdf756e28 100644
--- a/tests/providers/google/cloud/utils/test_credentials_provider.py
+++ b/tests/providers/google/cloud/utils/test_credentials_provider.py
@@ -375,14 +375,14 @@ class TestGetGcpCredentialsAndProjectId:
get_credentials_and_project_id(key_secret_name="secret name")
def
test_get_credentials_and_project_id_with_mutually_exclusive_configuration(self):
- with pytest.raises(
- AirflowException,
- match=re.escape(
- "The `keyfile_dict`, `key_path`, and `key_secret_name` fields
are all mutually exclusive."
- ),
- ):
+ with pytest.raises(AirflowException, match="mutually exclusive."):
get_credentials_and_project_id(key_path="KEY.json",
keyfile_dict={"private_key": "PRIVATE_KEY"})
+
@mock.patch("airflow.providers.google.cloud.utils.credentials_provider.AnonymousCredentials")
+ def test_get_credentials_using_anonymous_credentials(self,
mock_anonymous_credentials):
+ result = get_credentials_and_project_id(is_anonymous=True)
+ assert result == (mock_anonymous_credentials.return_value, "")
+
@mock.patch("google.auth.default", return_value=("CREDENTIALS",
"PROJECT_ID"))
@mock.patch(
"google.oauth2.service_account.Credentials.from_service_account_info",
diff --git a/tests/providers/google/common/hooks/test_base_google.py
b/tests/providers/google/common/hooks/test_base_google.py
index 1a91f0742d..50874c1c36 100644
--- a/tests/providers/google/common/hooks/test_base_google.py
+++ b/tests/providers/google/common/hooks/test_base_google.py
@@ -412,6 +412,7 @@ class TestGoogleBaseHook:
delegate_to=None,
target_principal=None,
delegates=None,
+ is_anonymous=None,
)
assert ("CREDENTIALS", "PROJECT_ID") == result
@@ -449,6 +450,7 @@ class TestGoogleBaseHook:
delegate_to=None,
target_principal=None,
delegates=None,
+ is_anonymous=None,
)
assert (mock_credentials, "PROJECT_ID") == result
@@ -479,6 +481,7 @@ class TestGoogleBaseHook:
delegate_to=None,
target_principal=None,
delegates=None,
+ is_anonymous=None,
)
assert (mock_credentials, "PROJECT_ID") == result
@@ -499,6 +502,7 @@ class TestGoogleBaseHook:
delegate_to="USER",
target_principal=None,
delegates=None,
+ is_anonymous=None,
)
assert (mock_credentials, "PROJECT_ID") == result
@@ -535,6 +539,7 @@ class TestGoogleBaseHook:
delegate_to=None,
target_principal=None,
delegates=None,
+ is_anonymous=None,
)
assert ("CREDENTIALS", "SECOND_PROJECT_ID") == result
@@ -544,12 +549,7 @@ class TestGoogleBaseHook:
"key_path": "KEY_PATH",
"keyfile_dict": '{"KEY": "VALUE"}',
}
- with pytest.raises(
- AirflowException,
- match=re.escape(
- "The `keyfile_dict`, `key_path`, and `key_secret_name` fields
are all mutually exclusive. "
- ),
- ):
+ with pytest.raises(AirflowException, match="mutually exclusive"):
self.instance.get_credentials_and_project_id()
def test_get_credentials_and_project_id_with_invalid_keyfile_dict(self):
@@ -559,6 +559,25 @@ class TestGoogleBaseHook:
with pytest.raises(AirflowException, match=re.escape("Invalid key
JSON.")):
self.instance.get_credentials_and_project_id()
+ @mock.patch(MODULE_NAME + ".get_credentials_and_project_id",
return_value=("CREDENTIALS", ""))
+ def test_get_credentials_and_project_id_with_is_anonymous(self,
mock_get_creds_and_proj_id):
+ self.instance.extras = {
+ "is_anonymous": True,
+ }
+ self.instance.get_credentials_and_project_id()
+ mock_get_creds_and_proj_id.assert_called_once_with(
+ key_path=None,
+ keyfile_dict=None,
+ credential_config_file=None,
+ key_secret_name=None,
+ key_secret_project_id=None,
+ scopes=self.instance.scopes,
+ delegate_to=None,
+ target_principal=None,
+ delegates=None,
+ is_anonymous=True,
+ )
+
@pytest.mark.skipif(
not default_creds_available, reason="Default Google Cloud credentials
not available to run tests"
)
@@ -764,6 +783,7 @@ class TestGoogleBaseHook:
delegate_to=None,
target_principal=target_principal,
delegates=delegates,
+ is_anonymous=None,
)
assert (mock_credentials, PROJECT_ID) == result