This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 27b3a22e34 Introduce anonymous credentials in GCP base hook (#39695)
27b3a22e34 is described below

commit 27b3a22e341468855c4ef368015ad946a59aa2e3
Author: Shahar Epstein <[email protected]>
AuthorDate: Mon May 20 00:20:38 2024 +0300

    Introduce anonymous credentials in GCP base hook (#39695)
---
 .../google/cloud/utils/credentials_provider.py     | 73 ++++++++++++----------
 .../providers/google/common/hooks/base_google.py   | 16 +++--
 .../cloud/utils/test_credentials_provider.py       | 12 ++--
 .../google/common/hooks/test_base_google.py        | 32 ++++++++--
 4 files changed, 84 insertions(+), 49 deletions(-)

diff --git a/airflow/providers/google/cloud/utils/credentials_provider.py 
b/airflow/providers/google/cloud/utils/credentials_provider.py
index 06f2b5c4d8..b3140c2da0 100644
--- a/airflow/providers/google/cloud/utils/credentials_provider.py
+++ b/airflow/providers/google/cloud/utils/credentials_provider.py
@@ -28,9 +28,9 @@ from typing import Collection, Generator, Sequence
 from urllib.parse import urlencode
 
 import google.auth
-import google.auth.credentials
 import google.oauth2.service_account
 from google.auth import impersonated_credentials  # type: ignore[attr-defined]
+from google.auth.credentials import AnonymousCredentials, Credentials
 from google.auth.environment_vars import CREDENTIALS, LEGACY_PROJECT, PROJECT
 
 from airflow.exceptions import AirflowException
@@ -178,6 +178,7 @@ class _CredentialProvider(LoggingMixin):
     :param key_secret_name: Keyfile Secret Name in GCP Secret Manager.
     :param key_secret_project_id: Project ID to read the secrets from. If not 
passed, the project ID from
         default credentials will be used.
+    :param credential_config_file: File path to or content of a GCP credential 
configuration file.
     :param scopes:  OAuth scopes for the connection
     :param delegate_to: The account to impersonate using domain-wide 
delegation of authority,
         if any. For this to work, the service account making the request must 
have
@@ -192,6 +193,8 @@ class _CredentialProvider(LoggingMixin):
         Service Account Token Creator IAM role to the directly preceding 
identity, with first
         account from the list granting this role to the originating account 
and target_principal
         granting the role to the last account from the list.
+    :param is_anonymous: Provides an anonymous set of credentials,
+        which is useful for APIs which do not require authentication.
     """
 
     def __init__(
@@ -206,13 +209,14 @@ class _CredentialProvider(LoggingMixin):
         disable_logging: bool = False,
         target_principal: str | None = None,
         delegates: Sequence[str] | None = None,
+        is_anonymous: bool | None = None,
     ) -> None:
         super().__init__()
-        key_options = [key_path, key_secret_name, keyfile_dict]
+        key_options = [key_path, keyfile_dict, credential_config_file, 
key_secret_name, is_anonymous]
         if len([x for x in key_options if x]) > 1:
             raise AirflowException(
-                "The `keyfile_dict`, `key_path`, and `key_secret_name` fields "
-                "are all mutually exclusive. Please provide only one value."
+                "The `keyfile_dict`, `key_path`, `credential_config_file`, 
`is_anonymous` and"
+                " `key_secret_name` fields are all mutually exclusive. Please 
provide only one value."
             )
         self.key_path = key_path
         self.keyfile_dict = keyfile_dict
@@ -224,43 +228,48 @@ class _CredentialProvider(LoggingMixin):
         self.disable_logging = disable_logging
         self.target_principal = target_principal
         self.delegates = delegates
+        self.is_anonymous = is_anonymous
 
-    def get_credentials_and_project(self) -> 
tuple[google.auth.credentials.Credentials, str]:
+    def get_credentials_and_project(self) -> tuple[Credentials, str]:
         """
         Get current credentials and project ID.
 
+        Project ID is an empty string when using anonymous credentials.
+
         :return: Google Auth Credentials
         """
-        if self.key_path:
-            credentials, project_id = self._get_credentials_using_key_path()
-        elif self.key_secret_name:
-            credentials, project_id = 
self._get_credentials_using_key_secret_name()
-        elif self.keyfile_dict:
-            credentials, project_id = 
self._get_credentials_using_keyfile_dict()
-        elif self.credential_config_file:
-            credentials, project_id = 
self._get_credentials_using_credential_config_file()
+        if self.is_anonymous:
+            credentials, project_id = AnonymousCredentials(), ""
         else:
-            credentials, project_id = self._get_credentials_using_adc()
-
-        if self.delegate_to:
-            if hasattr(credentials, "with_subject"):
-                credentials = credentials.with_subject(self.delegate_to)
+            if self.key_path:
+                credentials, project_id = 
self._get_credentials_using_key_path()
+            elif self.key_secret_name:
+                credentials, project_id = 
self._get_credentials_using_key_secret_name()
+            elif self.keyfile_dict:
+                credentials, project_id = 
self._get_credentials_using_keyfile_dict()
+            elif self.credential_config_file:
+                credentials, project_id = 
self._get_credentials_using_credential_config_file()
             else:
-                raise AirflowException(
-                    "The `delegate_to` parameter cannot be used here as the 
current "
-                    "authentication method does not support account 
impersonate. "
-                    "Please use service-account for authorization."
+                credentials, project_id = self._get_credentials_using_adc()
+            if self.delegate_to:
+                if hasattr(credentials, "with_subject"):
+                    credentials = credentials.with_subject(self.delegate_to)
+                else:
+                    raise AirflowException(
+                        "The `delegate_to` parameter cannot be used here as 
the current "
+                        "authentication method does not support account 
impersonate. "
+                        "Please use service-account for authorization."
+                    )
+
+            if self.target_principal:
+                credentials = impersonated_credentials.Credentials(
+                    source_credentials=credentials,
+                    target_principal=self.target_principal,
+                    delegates=self.delegates,
+                    target_scopes=self.scopes,
                 )
 
-        if self.target_principal:
-            credentials = impersonated_credentials.Credentials(
-                source_credentials=credentials,
-                target_principal=self.target_principal,
-                delegates=self.delegates,
-                target_scopes=self.scopes,
-            )
-
-            project_id = 
_get_project_id_from_service_account_email(self.target_principal)
+                project_id = 
_get_project_id_from_service_account_email(self.target_principal)
 
         return credentials, project_id
 
@@ -357,7 +366,7 @@ class _CredentialProvider(LoggingMixin):
             self.log.debug(*args, **kwargs)
 
 
-def get_credentials_and_project_id(*args, **kwargs) -> 
tuple[google.auth.credentials.Credentials, str]:
+def get_credentials_and_project_id(*args, **kwargs) -> tuple[Credentials, str]:
     """Return the Credentials object for Google API and the associated 
project_id."""
     return _CredentialProvider(*args, **kwargs).get_credentials_and_project()
 
diff --git a/airflow/providers/google/common/hooks/base_google.py 
b/airflow/providers/google/common/hooks/base_google.py
index 5800f8e44c..0de39330a9 100644
--- a/airflow/providers/google/common/hooks/base_google.py
+++ b/airflow/providers/google/common/hooks/base_google.py
@@ -30,7 +30,6 @@ from subprocess import check_output
 from typing import TYPE_CHECKING, Any, Callable, Generator, Sequence, TypeVar, 
cast
 
 import google.auth
-import google.auth.credentials
 import google.oauth2.service_account
 import google_auth_httplib2
 import requests
@@ -223,7 +222,7 @@ class GoogleBaseHook(BaseHook):
         """Return connection widgets to add to connection form."""
         from flask_appbuilder.fieldwidgets import BS3PasswordFieldWidget, 
BS3TextFieldWidget
         from flask_babel import lazy_gettext
-        from wtforms import IntegerField, PasswordField, StringField
+        from wtforms import BooleanField, IntegerField, PasswordField, 
StringField
         from wtforms.validators import NumberRange
 
         return {
@@ -249,6 +248,9 @@ class GoogleBaseHook(BaseHook):
             "impersonation_chain": StringField(
                 lazy_gettext("Impersonation Chain"), 
widget=BS3TextFieldWidget()
             ),
+            "is_anonymous": BooleanField(
+                lazy_gettext("Anonymous credentials (ignores all other 
settings)"), default=False
+            ),
         }
 
     @classmethod
@@ -270,10 +272,10 @@ class GoogleBaseHook(BaseHook):
         self.delegate_to = delegate_to
         self.impersonation_chain = impersonation_chain
         self.extras: dict = self.get_connection(self.gcp_conn_id).extra_dejson
-        self._cached_credentials: google.auth.credentials.Credentials | None = 
None
+        self._cached_credentials: Credentials | None = None
         self._cached_project_id: str | None = None
 
-    def get_credentials_and_project_id(self) -> 
tuple[google.auth.credentials.Credentials, str | None]:
+    def get_credentials_and_project_id(self) -> tuple[Credentials, str | None]:
         """Return the Credentials object for Google API and the associated 
project_id."""
         if self._cached_credentials is not None:
             return self._cached_credentials, self._cached_project_id
@@ -301,6 +303,7 @@ class GoogleBaseHook(BaseHook):
                 self.impersonation_chain = [s.strip() for s in 
self.impersonation_chain.split(",")]
 
         target_principal, delegates = 
_get_target_principal_and_delegates(self.impersonation_chain)
+        is_anonymous = self._get_field("is_anonymous")
 
         credentials, project_id = get_credentials_and_project_id(
             key_path=key_path,
@@ -312,6 +315,7 @@ class GoogleBaseHook(BaseHook):
             delegate_to=self.delegate_to,
             target_principal=target_principal,
             delegates=delegates,
+            is_anonymous=is_anonymous,
         )
 
         overridden_project_id = self._get_field("project")
@@ -323,7 +327,7 @@ class GoogleBaseHook(BaseHook):
 
         return credentials, project_id
 
-    def get_credentials(self) -> google.auth.credentials.Credentials:
+    def get_credentials(self) -> Credentials:
         """Return the Credentials object for Google API."""
         credentials, _ = self.get_credentials_and_project_id()
         return credentials
@@ -655,6 +659,8 @@ class GoogleBaseHook(BaseHook):
     def test_connection(self):
         """Test the Google cloud connectivity from UI."""
         status, message = False, ""
+        if self._get_field("is_anonymous"):
+            return True, "Credentials are anonymous"
         try:
             token = self._get_access_token()
             url = 
f"https://www.googleapis.com/oauth2/v3/tokeninfo?access_token={token}";
diff --git a/tests/providers/google/cloud/utils/test_credentials_provider.py 
b/tests/providers/google/cloud/utils/test_credentials_provider.py
index 812d9c5cd5..2fdf756e28 100644
--- a/tests/providers/google/cloud/utils/test_credentials_provider.py
+++ b/tests/providers/google/cloud/utils/test_credentials_provider.py
@@ -375,14 +375,14 @@ class TestGetGcpCredentialsAndProjectId:
             get_credentials_and_project_id(key_secret_name="secret name")
 
     def 
test_get_credentials_and_project_id_with_mutually_exclusive_configuration(self):
-        with pytest.raises(
-            AirflowException,
-            match=re.escape(
-                "The `keyfile_dict`, `key_path`, and `key_secret_name` fields 
are all mutually exclusive."
-            ),
-        ):
+        with pytest.raises(AirflowException, match="mutually exclusive."):
             get_credentials_and_project_id(key_path="KEY.json", 
keyfile_dict={"private_key": "PRIVATE_KEY"})
 
+    
@mock.patch("airflow.providers.google.cloud.utils.credentials_provider.AnonymousCredentials")
+    def test_get_credentials_using_anonymous_credentials(self, 
mock_anonymous_credentials):
+        result = get_credentials_and_project_id(is_anonymous=True)
+        assert result == (mock_anonymous_credentials.return_value, "")
+
     @mock.patch("google.auth.default", return_value=("CREDENTIALS", 
"PROJECT_ID"))
     @mock.patch(
         "google.oauth2.service_account.Credentials.from_service_account_info",
diff --git a/tests/providers/google/common/hooks/test_base_google.py 
b/tests/providers/google/common/hooks/test_base_google.py
index 1a91f0742d..50874c1c36 100644
--- a/tests/providers/google/common/hooks/test_base_google.py
+++ b/tests/providers/google/common/hooks/test_base_google.py
@@ -412,6 +412,7 @@ class TestGoogleBaseHook:
             delegate_to=None,
             target_principal=None,
             delegates=None,
+            is_anonymous=None,
         )
         assert ("CREDENTIALS", "PROJECT_ID") == result
 
@@ -449,6 +450,7 @@ class TestGoogleBaseHook:
             delegate_to=None,
             target_principal=None,
             delegates=None,
+            is_anonymous=None,
         )
         assert (mock_credentials, "PROJECT_ID") == result
 
@@ -479,6 +481,7 @@ class TestGoogleBaseHook:
             delegate_to=None,
             target_principal=None,
             delegates=None,
+            is_anonymous=None,
         )
         assert (mock_credentials, "PROJECT_ID") == result
 
@@ -499,6 +502,7 @@ class TestGoogleBaseHook:
             delegate_to="USER",
             target_principal=None,
             delegates=None,
+            is_anonymous=None,
         )
         assert (mock_credentials, "PROJECT_ID") == result
 
@@ -535,6 +539,7 @@ class TestGoogleBaseHook:
             delegate_to=None,
             target_principal=None,
             delegates=None,
+            is_anonymous=None,
         )
         assert ("CREDENTIALS", "SECOND_PROJECT_ID") == result
 
@@ -544,12 +549,7 @@ class TestGoogleBaseHook:
             "key_path": "KEY_PATH",
             "keyfile_dict": '{"KEY": "VALUE"}',
         }
-        with pytest.raises(
-            AirflowException,
-            match=re.escape(
-                "The `keyfile_dict`, `key_path`, and `key_secret_name` fields 
are all mutually exclusive. "
-            ),
-        ):
+        with pytest.raises(AirflowException, match="mutually exclusive"):
             self.instance.get_credentials_and_project_id()
 
     def test_get_credentials_and_project_id_with_invalid_keyfile_dict(self):
@@ -559,6 +559,25 @@ class TestGoogleBaseHook:
         with pytest.raises(AirflowException, match=re.escape("Invalid key 
JSON.")):
             self.instance.get_credentials_and_project_id()
 
+    @mock.patch(MODULE_NAME + ".get_credentials_and_project_id", 
return_value=("CREDENTIALS", ""))
+    def test_get_credentials_and_project_id_with_is_anonymous(self, 
mock_get_creds_and_proj_id):
+        self.instance.extras = {
+            "is_anonymous": True,
+        }
+        self.instance.get_credentials_and_project_id()
+        mock_get_creds_and_proj_id.assert_called_once_with(
+            key_path=None,
+            keyfile_dict=None,
+            credential_config_file=None,
+            key_secret_name=None,
+            key_secret_project_id=None,
+            scopes=self.instance.scopes,
+            delegate_to=None,
+            target_principal=None,
+            delegates=None,
+            is_anonymous=True,
+        )
+
     @pytest.mark.skipif(
         not default_creds_available, reason="Default Google Cloud credentials 
not available to run tests"
     )
@@ -764,6 +783,7 @@ class TestGoogleBaseHook:
             delegate_to=None,
             target_principal=target_principal,
             delegates=delegates,
+            is_anonymous=None,
         )
         assert (mock_credentials, PROJECT_ID) == result
 

Reply via email to