This is an automated email from the ASF dual-hosted git repository.
kamilbregula pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new 5eacc16 Add support for impersonation in GCP hooks (#9915)
5eacc16 is described below
commit 5eacc164201a121cd06126aff613cbe0919d35cc
Author: Kamil Olszewski <[email protected]>
AuthorDate: Wed Jul 22 01:02:32 2020 +0200
Add support for impersonation in GCP hooks (#9915)
Co-authored-by: Kamil Olszewski <[email protected]>
---
airflow/providers/google/cloud/hooks/automl.py | 13 +++-
airflow/providers/google/cloud/hooks/bigquery.py | 6 +-
.../providers/google/cloud/hooks/bigquery_dts.py | 11 ++-
airflow/providers/google/cloud/hooks/bigtable.py | 15 +++-
.../providers/google/cloud/hooks/cloud_build.py | 25 ++++--
.../google/cloud/hooks/cloud_memorystore.py | 26 ++++++-
airflow/providers/google/cloud/hooks/cloud_sql.py | 13 +++-
.../cloud/hooks/cloud_storage_transfer_service.py | 13 +++-
airflow/providers/google/cloud/hooks/compute.py | 11 ++-
.../providers/google/cloud/hooks/datacatalog.py | 26 ++++++-
airflow/providers/google/cloud/hooks/dataflow.py | 11 ++-
airflow/providers/google/cloud/hooks/datafusion.py | 9 ++-
airflow/providers/google/cloud/hooks/datastore.py | 11 ++-
airflow/providers/google/cloud/hooks/dlp.py | 26 ++++++-
airflow/providers/google/cloud/hooks/functions.py | 13 +++-
airflow/providers/google/cloud/hooks/gcs.py | 15 ++--
airflow/providers/google/cloud/hooks/gdm.py | 15 +++-
airflow/providers/google/cloud/hooks/kms.py | 24 ++++--
.../google/cloud/hooks/kubernetes_engine.py | 10 ++-
.../providers/google/cloud/hooks/life_sciences.py | 24 ++++--
.../google/cloud/hooks/natural_language.py | 26 ++++++-
airflow/providers/google/cloud/hooks/pubsub.py | 13 +++-
.../providers/google/cloud/hooks/secret_manager.py | 24 ++++--
airflow/providers/google/cloud/hooks/spanner.py | 15 +++-
.../providers/google/cloud/hooks/speech_to_text.py | 28 +++++--
.../providers/google/cloud/hooks/stackdriver.py | 15 +++-
airflow/providers/google/cloud/hooks/tasks.py | 26 ++++++-
.../providers/google/cloud/hooks/text_to_speech.py | 28 +++++--
airflow/providers/google/cloud/hooks/translate.py | 15 +++-
.../google/cloud/hooks/video_intelligence.py | 26 ++++++-
airflow/providers/google/cloud/hooks/vision.py | 13 +++-
.../google/cloud/utils/credentials_provider.py | 55 +++++++++++++-
.../providers/google/common/hooks/base_google.py | 31 ++++++--
.../providers/google/common/hooks/discovery_api.py | 26 +++++--
.../providers/google/firebase/hooks/firestore.py | 22 +++++-
.../marketing_platform/hooks/campaign_manager.py | 9 ++-
.../marketing_platform/hooks/display_video.py | 9 ++-
.../google/marketing_platform/hooks/search_ads.py | 9 ++-
airflow/providers/google/suite/hooks/drive.py | 24 ++++--
airflow/providers/google/suite/hooks/sheets.py | 24 ++++--
.../providers/google/cloud/hooks/test_bigquery.py | 6 +-
.../providers/google/cloud/hooks/test_dataflow.py | 7 +-
.../providers/google/cloud/hooks/test_datastore.py | 7 +-
tests/providers/google/cloud/hooks/test_gdm.py | 7 +-
tests/providers/google/cloud/hooks/test_kms.py | 7 +-
tests/providers/google/cloud/hooks/test_pubsub.py | 7 +-
.../providers/google/cloud/utils/base_gcp_mock.py | 16 +++-
.../cloud/utils/test_credentials_provider.py | 88 +++++++++++++++++++++-
.../google/common/hooks/test_base_google.py | 52 +++++++++++--
49 files changed, 791 insertions(+), 161 deletions(-)
diff --git a/airflow/providers/google/cloud/hooks/automl.py
b/airflow/providers/google/cloud/hooks/automl.py
index 5c2ea89..a70b599 100644
--- a/airflow/providers/google/cloud/hooks/automl.py
+++ b/airflow/providers/google/cloud/hooks/automl.py
@@ -41,9 +41,16 @@ class CloudAutoMLHook(GoogleBaseHook):
"""
def __init__(
- self, gcp_conn_id: str = "google_cloud_default", delegate_to:
Optional[str] = None
- ):
- super().__init__(gcp_conn_id, delegate_to)
+ self,
+ gcp_conn_id: str = "google_cloud_default",
+ delegate_to: Optional[str] = None,
+ impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+ ) -> None:
+ super().__init__(
+ gcp_conn_id=gcp_conn_id,
+ delegate_to=delegate_to,
+ impersonation_chain=impersonation_chain,
+ )
self._client = None # type: Optional[AutoMlClient]
@staticmethod
diff --git a/airflow/providers/google/cloud/hooks/bigquery.py
b/airflow/providers/google/cloud/hooks/bigquery.py
index efdd6c3..cd186b8 100644
--- a/airflow/providers/google/cloud/hooks/bigquery.py
+++ b/airflow/providers/google/cloud/hooks/bigquery.py
@@ -61,6 +61,7 @@ class BigQueryHook(GoogleBaseHook, DbApiHook):
def __init__(self,
gcp_conn_id: str = 'google_cloud_default',
delegate_to: Optional[str] = None,
+ impersonation_chain: Optional[Union[str, Sequence[str]]] =
None,
use_legacy_sql: bool = True,
location: Optional[str] = None,
bigquery_conn_id: Optional[str] = None,
@@ -73,7 +74,10 @@ class BigQueryHook(GoogleBaseHook, DbApiHook):
"the gcp_conn_id parameter.", DeprecationWarning, stacklevel=2)
gcp_conn_id = bigquery_conn_id
super().__init__(
- gcp_conn_id=gcp_conn_id, delegate_to=delegate_to)
+ gcp_conn_id=gcp_conn_id,
+ delegate_to=delegate_to,
+ impersonation_chain=impersonation_chain,
+ )
self.use_legacy_sql = use_legacy_sql
self.location = location
self.running_job_id = None # type: Optional[str]
diff --git a/airflow/providers/google/cloud/hooks/bigquery_dts.py
b/airflow/providers/google/cloud/hooks/bigquery_dts.py
index bd39bb5..efea859 100644
--- a/airflow/providers/google/cloud/hooks/bigquery_dts.py
+++ b/airflow/providers/google/cloud/hooks/bigquery_dts.py
@@ -51,9 +51,16 @@ class BiqQueryDataTransferServiceHook(GoogleBaseHook):
_conn = None # type: Optional[Resource]
def __init__(
- self, gcp_conn_id: str = "google_cloud_default", delegate_to:
Optional[str] = None
+ self,
+ gcp_conn_id: str = "google_cloud_default",
+ delegate_to: Optional[str] = None,
+ impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
) -> None:
- super().__init__(gcp_conn_id=gcp_conn_id, delegate_to=delegate_to)
+ super().__init__(
+ gcp_conn_id=gcp_conn_id,
+ delegate_to=delegate_to,
+ impersonation_chain=impersonation_chain,
+ )
@staticmethod
def _disable_auto_scheduling(config: Union[dict, TransferConfig]) ->
TransferConfig:
diff --git a/airflow/providers/google/cloud/hooks/bigtable.py
b/airflow/providers/google/cloud/hooks/bigtable.py
index 49188a1..0627fcc 100644
--- a/airflow/providers/google/cloud/hooks/bigtable.py
+++ b/airflow/providers/google/cloud/hooks/bigtable.py
@@ -18,7 +18,7 @@
"""
This module contains a Google Cloud Bigtable Hook.
"""
-from typing import Dict, List, Optional
+from typing import Dict, List, Optional, Sequence, Union
from google.cloud.bigtable import Client
from google.cloud.bigtable.cluster import Cluster
@@ -39,8 +39,17 @@ class BigtableHook(GoogleBaseHook):
"""
# pylint: disable=too-many-arguments
- def __init__(self, gcp_conn_id: str = 'google_cloud_default', delegate_to:
Optional[str] = None) -> None:
- super().__init__(gcp_conn_id, delegate_to)
+ def __init__(
+ self,
+ gcp_conn_id: str = "google_cloud_default",
+ delegate_to: Optional[str] = None,
+ impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+ ) -> None:
+ super().__init__(
+ gcp_conn_id=gcp_conn_id,
+ delegate_to=delegate_to,
+ impersonation_chain=impersonation_chain,
+ )
self._client = None
def _get_client(self, project_id: str):
diff --git a/airflow/providers/google/cloud/hooks/cloud_build.py
b/airflow/providers/google/cloud/hooks/cloud_build.py
index 85123f1..db9a3eb 100644
--- a/airflow/providers/google/cloud/hooks/cloud_build.py
+++ b/airflow/providers/google/cloud/hooks/cloud_build.py
@@ -18,7 +18,7 @@
"""Hook for Google Cloud Build service"""
import time
-from typing import Any, Dict, Optional
+from typing import Any, Dict, Optional, Sequence, Union
from googleapiclient.discovery import build
@@ -41,10 +41,19 @@ class CloudBuildHook(GoogleBaseHook):
:type api_version: str
:param gcp_conn_id: The connection ID to use when fetching connection info.
:type gcp_conn_id: str
- :param delegate_to: The account to impersonate, if any.
- For this to work, the service account making the request must have
+ :param delegate_to: The account to impersonate using domain-wide
delegation of authority,
+ if any. For this to work, the service account making the request must
have
domain-wide delegation enabled.
:type delegate_to: str
+ :param impersonation_chain: Optional service account to impersonate using
short-term
+ credentials, or chained list of accounts required to get the
access_token
+ of the last account in the list, which will be impersonated in the
request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account.
+ :type impersonation_chain: Union[str, Sequence[str]]
"""
_conn = None # type: Optional[Any]
@@ -53,9 +62,15 @@ class CloudBuildHook(GoogleBaseHook):
self,
api_version: str = "v1",
gcp_conn_id: str = "google_cloud_default",
- delegate_to: Optional[str] = None
+ delegate_to: Optional[str] = None,
+ impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
) -> None:
- super().__init__(gcp_conn_id, delegate_to)
+ super().__init__(
+ gcp_conn_id=gcp_conn_id,
+ delegate_to=delegate_to,
+ impersonation_chain=impersonation_chain,
+ )
+
self.api_version = api_version
def get_conn(self):
diff --git a/airflow/providers/google/cloud/hooks/cloud_memorystore.py
b/airflow/providers/google/cloud/hooks/cloud_memorystore.py
index 64ef45a..bb37e4a 100644
--- a/airflow/providers/google/cloud/hooks/cloud_memorystore.py
+++ b/airflow/providers/google/cloud/hooks/cloud_memorystore.py
@@ -41,14 +41,32 @@ class CloudMemorystoreHook(GoogleBaseHook):
:param gcp_conn_id: The connection ID to use when fetching connection info.
:type gcp_conn_id: str
- :param delegate_to: The account to impersonate, if any.
- For this to work, the service account making the request must have
+ :param delegate_to: The account to impersonate using domain-wide
delegation of authority,
+ if any. For this to work, the service account making the request must
have
domain-wide delegation enabled.
:type delegate_to: str
+ :param impersonation_chain: Optional service account to impersonate using
short-term
+ credentials, or chained list of accounts required to get the
access_token
+ of the last account in the list, which will be impersonated in the
request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account.
+ :type impersonation_chain: Union[str, Sequence[str]]
"""
- def __init__(self, gcp_conn_id: str = "google_cloud_default", delegate_to:
Optional[str] = None):
- super().__init__(gcp_conn_id, delegate_to)
+ def __init__(
+ self,
+ gcp_conn_id: str = "google_cloud_default",
+ delegate_to: Optional[str] = None,
+ impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+ ) -> None:
+ super().__init__(
+ gcp_conn_id=gcp_conn_id,
+ delegate_to=delegate_to,
+ impersonation_chain=impersonation_chain,
+ )
self._client = None # type: Optional[CloudRedisClient]
def get_conn(self,):
diff --git a/airflow/providers/google/cloud/hooks/cloud_sql.py
b/airflow/providers/google/cloud/hooks/cloud_sql.py
index 2da321e..d6ef47e 100644
--- a/airflow/providers/google/cloud/hooks/cloud_sql.py
+++ b/airflow/providers/google/cloud/hooks/cloud_sql.py
@@ -34,7 +34,7 @@ import subprocess
import time
import uuid
from subprocess import PIPE, Popen
-from typing import Any, Dict, List, Optional, Union
+from typing import Any, Dict, List, Optional, Sequence, Union
from urllib.parse import quote_plus
import requests
@@ -80,10 +80,15 @@ class CloudSQLHook(GoogleBaseHook):
def __init__(
self,
api_version: str,
- gcp_conn_id: str = 'google_cloud_default',
- delegate_to: Optional[str] = None
+ gcp_conn_id: str = "google_cloud_default",
+ delegate_to: Optional[str] = None,
+ impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
) -> None:
- super().__init__(gcp_conn_id, delegate_to)
+ super().__init__(
+ gcp_conn_id=gcp_conn_id,
+ delegate_to=delegate_to,
+ impersonation_chain=impersonation_chain,
+ )
self.api_version = api_version
self._conn = None
diff --git
a/airflow/providers/google/cloud/hooks/cloud_storage_transfer_service.py
b/airflow/providers/google/cloud/hooks/cloud_storage_transfer_service.py
index d901e5f..ec31ed2 100644
--- a/airflow/providers/google/cloud/hooks/cloud_storage_transfer_service.py
+++ b/airflow/providers/google/cloud/hooks/cloud_storage_transfer_service.py
@@ -25,7 +25,7 @@ import time
import warnings
from copy import deepcopy
from datetime import timedelta
-from typing import Dict, List, Optional, Set, Union
+from typing import Dict, List, Optional, Sequence, Set, Union
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
@@ -129,10 +129,15 @@ class CloudDataTransferServiceHook(GoogleBaseHook):
def __init__(
self,
api_version: str = 'v1',
- gcp_conn_id: str = 'google_cloud_default',
- delegate_to: Optional[str] = None
+ gcp_conn_id: str = "google_cloud_default",
+ delegate_to: Optional[str] = None,
+ impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
) -> None:
- super().__init__(gcp_conn_id, delegate_to)
+ super().__init__(
+ gcp_conn_id=gcp_conn_id,
+ delegate_to=delegate_to,
+ impersonation_chain=impersonation_chain,
+ )
self.api_version = api_version
self._conn = None
diff --git a/airflow/providers/google/cloud/hooks/compute.py
b/airflow/providers/google/cloud/hooks/compute.py
index e3075fa..953caf6 100644
--- a/airflow/providers/google/cloud/hooks/compute.py
+++ b/airflow/providers/google/cloud/hooks/compute.py
@@ -20,7 +20,7 @@ This module contains a Google Compute Engine Hook.
"""
import time
-from typing import Any, Dict, Optional
+from typing import Any, Dict, Optional, Sequence, Union
from googleapiclient.discovery import build
@@ -54,9 +54,14 @@ class ComputeEngineHook(GoogleBaseHook):
self,
api_version: str = 'v1',
gcp_conn_id: str = 'google_cloud_default',
- delegate_to: Optional[str] = None
+ delegate_to: Optional[str] = None,
+ impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
) -> None:
- super().__init__(gcp_conn_id, delegate_to)
+ super().__init__(
+ gcp_conn_id=gcp_conn_id,
+ delegate_to=delegate_to,
+ impersonation_chain=impersonation_chain,
+ )
self.api_version = api_version
def get_conn(self):
diff --git a/airflow/providers/google/cloud/hooks/datacatalog.py
b/airflow/providers/google/cloud/hooks/datacatalog.py
index 49b4c16..5c02118 100644
--- a/airflow/providers/google/cloud/hooks/datacatalog.py
+++ b/airflow/providers/google/cloud/hooks/datacatalog.py
@@ -33,14 +33,32 @@ class CloudDataCatalogHook(GoogleBaseHook):
:param gcp_conn_id: The connection ID to use when fetching connection info.
:type gcp_conn_id: str
- :param delegate_to: The account to impersonate, if any.
- For this to work, the service account making the request must have
+ :param delegate_to: The account to impersonate using domain-wide
delegation of authority,
+ if any. For this to work, the service account making the request must
have
domain-wide delegation enabled.
:type delegate_to: str
+ :param impersonation_chain: Optional service account to impersonate using
short-term
+ credentials, or chained list of accounts required to get the
access_token
+ of the last account in the list, which will be impersonated in the
request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account.
+ :type impersonation_chain: Union[str, Sequence[str]]
"""
- def __init__(self, gcp_conn_id: str = "google_cloud_default", delegate_to:
Optional[str] = None) -> None:
- super().__init__(gcp_conn_id, delegate_to)
+ def __init__(
+ self,
+ gcp_conn_id: str = "google_cloud_default",
+ delegate_to: Optional[str] = None,
+ impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+ ) -> None:
+ super().__init__(
+ gcp_conn_id=gcp_conn_id,
+ delegate_to=delegate_to,
+ impersonation_chain=impersonation_chain,
+ )
self._client: Optional[DataCatalogClient] = None
def get_conn(self) -> DataCatalogClient:
diff --git a/airflow/providers/google/cloud/hooks/dataflow.py
b/airflow/providers/google/cloud/hooks/dataflow.py
index ee3b3f6..7b21c2c 100644
--- a/airflow/providers/google/cloud/hooks/dataflow.py
+++ b/airflow/providers/google/cloud/hooks/dataflow.py
@@ -29,7 +29,7 @@ import uuid
import warnings
from copy import deepcopy
from tempfile import TemporaryDirectory
-from typing import Any, Callable, Dict, List, Optional, TypeVar, cast
+from typing import Any, Callable, Dict, List, Optional, Sequence, TypeVar,
Union, cast
from googleapiclient.discovery import build
@@ -413,12 +413,17 @@ class DataflowHook(GoogleBaseHook):
def __init__(
self,
- gcp_conn_id: str = 'google_cloud_default',
+ gcp_conn_id: str = "google_cloud_default",
delegate_to: Optional[str] = None,
+ impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
poll_sleep: int = 10
) -> None:
self.poll_sleep = poll_sleep
- super().__init__(gcp_conn_id, delegate_to)
+ super().__init__(
+ gcp_conn_id=gcp_conn_id,
+ delegate_to=delegate_to,
+ impersonation_chain=impersonation_chain,
+ )
def get_conn(self):
"""
diff --git a/airflow/providers/google/cloud/hooks/datafusion.py
b/airflow/providers/google/cloud/hooks/datafusion.py
index 4015d4a..50d552c 100644
--- a/airflow/providers/google/cloud/hooks/datafusion.py
+++ b/airflow/providers/google/cloud/hooks/datafusion.py
@@ -21,7 +21,7 @@ This module contains Google DataFusion hook.
import json
import os
from time import monotonic, sleep
-from typing import Any, Dict, List, Optional, Union
+from typing import Any, Dict, List, Optional, Sequence, Union
from urllib.parse import quote, urlencode
import google.auth
@@ -64,8 +64,13 @@ class DataFusionHook(GoogleBaseHook):
api_version: str = "v1beta1",
gcp_conn_id: str = "google_cloud_default",
delegate_to: Optional[str] = None,
+ impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
) -> None:
- super().__init__(gcp_conn_id, delegate_to)
+ super().__init__(
+ gcp_conn_id=gcp_conn_id,
+ delegate_to=delegate_to,
+ impersonation_chain=impersonation_chain,
+ )
self.api_version = api_version
def wait_for_operation(self, operation: Dict[str, Any]) -> Dict[str, Any]:
diff --git a/airflow/providers/google/cloud/hooks/datastore.py
b/airflow/providers/google/cloud/hooks/datastore.py
index 7da30af..0dcf7f3 100644
--- a/airflow/providers/google/cloud/hooks/datastore.py
+++ b/airflow/providers/google/cloud/hooks/datastore.py
@@ -22,7 +22,7 @@ This module contains Google Datastore hook.
import time
import warnings
-from typing import Any, Dict, List, Optional, Union
+from typing import Any, Dict, List, Optional, Sequence, Union
from googleapiclient.discovery import build
@@ -42,8 +42,9 @@ class DatastoreHook(GoogleBaseHook):
def __init__(
self,
- gcp_conn_id: str = 'google_cloud_default',
+ gcp_conn_id: str = "google_cloud_default",
delegate_to: Optional[str] = None,
+ impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
api_version: str = 'v1',
datastore_conn_id: Optional[str] = None
) -> None:
@@ -52,7 +53,11 @@ class DatastoreHook(GoogleBaseHook):
"The datastore_conn_id parameter has been deprecated. You
should pass "
"the gcp_conn_id parameter.", DeprecationWarning, stacklevel=2)
gcp_conn_id = datastore_conn_id
- super().__init__(gcp_conn_id=gcp_conn_id, delegate_to=delegate_to)
+ super().__init__(
+ gcp_conn_id=gcp_conn_id,
+ delegate_to=delegate_to,
+ impersonation_chain=impersonation_chain,
+ )
self.connection = None
self.api_version = api_version
diff --git a/airflow/providers/google/cloud/hooks/dlp.py
b/airflow/providers/google/cloud/hooks/dlp.py
index 79d2023..a8dcdce 100644
--- a/airflow/providers/google/cloud/hooks/dlp.py
+++ b/airflow/providers/google/cloud/hooks/dlp.py
@@ -52,14 +52,32 @@ class CloudDLPHook(GoogleBaseHook):
:param gcp_conn_id: The connection ID to use when fetching connection info.
:type gcp_conn_id: str
- :param delegate_to: The account to impersonate, if any.
- For this to work, the service account making the request must have
+ :param delegate_to: The account to impersonate using domain-wide
delegation of authority,
+ if any. For this to work, the service account making the request must
have
domain-wide delegation enabled.
:type delegate_to: str
+ :param impersonation_chain: Optional service account to impersonate using
short-term
+ credentials, or chained list of accounts required to get the
access_token
+ of the last account in the list, which will be impersonated in the
request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account.
+ :type impersonation_chain: Union[str, Sequence[str]]
"""
- def __init__(self, gcp_conn_id: str = "google_cloud_default", delegate_to:
Optional[str] = None) -> None:
- super().__init__(gcp_conn_id, delegate_to)
+ def __init__(
+ self,
+ gcp_conn_id: str = "google_cloud_default",
+ delegate_to: Optional[str] = None,
+ impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+ ) -> None:
+ super().__init__(
+ gcp_conn_id=gcp_conn_id,
+ delegate_to=delegate_to,
+ impersonation_chain=impersonation_chain,
+ )
self._client = None
def get_conn(self) -> DlpServiceClient:
diff --git a/airflow/providers/google/cloud/hooks/functions.py
b/airflow/providers/google/cloud/hooks/functions.py
index 9d7439d..84328f0 100644
--- a/airflow/providers/google/cloud/hooks/functions.py
+++ b/airflow/providers/google/cloud/hooks/functions.py
@@ -19,7 +19,7 @@
This module contains a Google Cloud Functions Hook.
"""
import time
-from typing import Any, Dict, List, Optional
+from typing import Any, Dict, List, Optional, Sequence, Union
import requests
from googleapiclient.discovery import build
@@ -44,10 +44,15 @@ class CloudFunctionsHook(GoogleBaseHook):
def __init__(
self,
api_version: str,
- gcp_conn_id: str = 'google_cloud_default',
- delegate_to: Optional[str] = None
+ gcp_conn_id: str = "google_cloud_default",
+ delegate_to: Optional[str] = None,
+ impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
) -> None:
- super().__init__(gcp_conn_id, delegate_to)
+ super().__init__(
+ gcp_conn_id=gcp_conn_id,
+ delegate_to=delegate_to,
+ impersonation_chain=impersonation_chain,
+ )
self.api_version = api_version
@staticmethod
diff --git a/airflow/providers/google/cloud/hooks/gcs.py
b/airflow/providers/google/cloud/hooks/gcs.py
index 94523ff..949937b 100644
--- a/airflow/providers/google/cloud/hooks/gcs.py
+++ b/airflow/providers/google/cloud/hooks/gcs.py
@@ -28,7 +28,7 @@ from contextlib import contextmanager
from io import BytesIO
from os import path
from tempfile import NamedTemporaryFile
-from typing import Callable, Optional, Set, Tuple, TypeVar, Union, cast
+from typing import Callable, Optional, Sequence, Set, Tuple, TypeVar, Union,
cast
from urllib.parse import urlparse
from google.api_core.exceptions import NotFound
@@ -114,9 +114,10 @@ class GCSHook(GoogleBaseHook):
def __init__(
self,
- gcp_conn_id: str = 'google_cloud_default',
- delegate_to: Optional[str] = None,
- google_cloud_storage_conn_id: Optional[str] = None
+ gcp_conn_id: str = "google_cloud_default",
+ delegate_to: Optional[str] = None,
+ impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+ google_cloud_storage_conn_id: Optional[str] = None
) -> None:
# To preserve backward compatibility
# TODO: remove one day
@@ -125,7 +126,11 @@ class GCSHook(GoogleBaseHook):
"The google_cloud_storage_conn_id parameter has been
deprecated. You should pass "
"the gcp_conn_id parameter.", DeprecationWarning, stacklevel=2)
gcp_conn_id = google_cloud_storage_conn_id
- super().__init__(gcp_conn_id=gcp_conn_id, delegate_to=delegate_to)
+ super().__init__(
+ gcp_conn_id=gcp_conn_id,
+ delegate_to=delegate_to,
+ impersonation_chain=impersonation_chain,
+ )
def get_conn(self):
"""
diff --git a/airflow/providers/google/cloud/hooks/gdm.py
b/airflow/providers/google/cloud/hooks/gdm.py
index 6f5af2f..60cebbf 100644
--- a/airflow/providers/google/cloud/hooks/gdm.py
+++ b/airflow/providers/google/cloud/hooks/gdm.py
@@ -17,7 +17,7 @@
# under the License.
#
-from typing import Any, Dict, List, Optional
+from typing import Any, Dict, List, Optional, Sequence, Union
from googleapiclient.discovery import build
@@ -31,8 +31,17 @@ class GoogleDeploymentManagerHook(GoogleBaseHook): #
pylint: disable=abstract-m
This allows for scheduled and programatic inspection and deletion fo
resources managed by GDM.
"""
- def __init__(self, gcp_conn_id='google_cloud_default', delegate_to=None):
- super(GoogleDeploymentManagerHook, self).__init__(gcp_conn_id,
delegate_to=delegate_to)
+ def __init__(
+ self,
+ gcp_conn_id: str = "google_cloud_default",
+ delegate_to: Optional[str] = None,
+ impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+ ) -> None:
+ super(GoogleDeploymentManagerHook, self).__init__(
+ gcp_conn_id=gcp_conn_id,
+ delegate_to=delegate_to,
+ impersonation_chain=impersonation_chain,
+ )
def get_conn(self):
"""
diff --git a/airflow/providers/google/cloud/hooks/kms.py
b/airflow/providers/google/cloud/hooks/kms.py
index 3309c46..419f3fd 100644
--- a/airflow/providers/google/cloud/hooks/kms.py
+++ b/airflow/providers/google/cloud/hooks/kms.py
@@ -22,7 +22,7 @@ This module contains a Google Cloud KMS hook.
import base64
-from typing import Optional, Sequence, Tuple
+from typing import Optional, Sequence, Tuple, Union
from google.api_core.retry import Retry
from google.cloud.kms_v1 import KeyManagementServiceClient
@@ -47,18 +47,32 @@ class CloudKMSHook(GoogleBaseHook):
:param gcp_conn_id: The connection ID to use when fetching connection info.
:type gcp_conn_id: str
- :param delegate_to: The account to impersonate, if any.
- For this to work, the service account making the request must have
+ :param delegate_to: The account to impersonate using domain-wide
delegation of authority,
+ if any. For this to work, the service account making the request must
have
domain-wide delegation enabled.
:type delegate_to: str
+ :param impersonation_chain: Optional service account to impersonate using
short-term
+ credentials, or chained list of accounts required to get the
access_token
+ of the last account in the list, which will be impersonated in the
request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account.
+ :type impersonation_chain: Union[str, Sequence[str]]
"""
def __init__(
self,
gcp_conn_id: str = "google_cloud_default",
- delegate_to: Optional[str] = None
+ delegate_to: Optional[str] = None,
+ impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
) -> None:
- super().__init__(gcp_conn_id=gcp_conn_id, delegate_to=delegate_to)
+ super().__init__(
+ gcp_conn_id=gcp_conn_id,
+ delegate_to=delegate_to,
+ impersonation_chain=impersonation_chain,
+ )
self._conn = None # type: Optional[KeyManagementServiceClient]
def get_conn(self) -> KeyManagementServiceClient:
diff --git a/airflow/providers/google/cloud/hooks/kubernetes_engine.py
b/airflow/providers/google/cloud/hooks/kubernetes_engine.py
index e2a3c14..d391a83 100644
--- a/airflow/providers/google/cloud/hooks/kubernetes_engine.py
+++ b/airflow/providers/google/cloud/hooks/kubernetes_engine.py
@@ -22,7 +22,7 @@ This module contains a Google Kubernetes Engine Hook.
import time
import warnings
-from typing import Dict, Optional, Union
+from typing import Dict, Optional, Sequence, Union
from google.api_core.exceptions import AlreadyExists, NotFound
from google.api_core.gapic_v1.method import DEFAULT
@@ -49,12 +49,16 @@ class GKEHook(GoogleBaseHook):
def __init__(
self,
- gcp_conn_id: str = 'google_cloud_default',
+ gcp_conn_id: str = "google_cloud_default",
delegate_to: Optional[str] = None,
+ impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
location: Optional[str] = None
) -> None:
super().__init__(
- gcp_conn_id=gcp_conn_id, delegate_to=delegate_to)
+ gcp_conn_id=gcp_conn_id,
+ delegate_to=delegate_to,
+ impersonation_chain=impersonation_chain,
+ )
self._client = None
self.location = location
diff --git a/airflow/providers/google/cloud/hooks/life_sciences.py
b/airflow/providers/google/cloud/hooks/life_sciences.py
index 8c1e516..11631de 100644
--- a/airflow/providers/google/cloud/hooks/life_sciences.py
+++ b/airflow/providers/google/cloud/hooks/life_sciences.py
@@ -18,7 +18,7 @@
"""Hook for Google Cloud Life Sciences service"""
import time
-from typing import Any, Dict, Optional
+from typing import Any, Dict, Optional, Sequence, Union
import google.api_core.path_template
from googleapiclient.discovery import build
@@ -42,10 +42,19 @@ class LifeSciencesHook(GoogleBaseHook):
:type api_version: str
:param gcp_conn_id: The connection ID to use when fetching connection info.
:type gcp_conn_id: str
- :param delegate_to: The account to impersonate, if any.
- For this to work, the service account making the request must have
+ :param delegate_to: The account to impersonate using domain-wide
delegation of authority,
+ if any. For this to work, the service account making the request must
have
domain-wide delegation enabled.
:type delegate_to: str
+ :param impersonation_chain: Optional service account to impersonate using
short-term
+ credentials, or chained list of accounts required to get the
access_token
+ of the last account in the list, which will be impersonated in the
request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account.
+ :type impersonation_chain: Union[str, Sequence[str]]
"""
_conn = None # type: Optional[Any]
@@ -54,9 +63,14 @@ class LifeSciencesHook(GoogleBaseHook):
self,
api_version: str = "v2beta",
gcp_conn_id: str = "google_cloud_default",
- delegate_to: Optional[str] = None
+ delegate_to: Optional[str] = None,
+ impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
) -> None:
- super().__init__(gcp_conn_id, delegate_to)
+ super().__init__(
+ gcp_conn_id=gcp_conn_id,
+ delegate_to=delegate_to,
+ impersonation_chain=impersonation_chain,
+ )
self.api_version = api_version
def get_conn(self):
diff --git a/airflow/providers/google/cloud/hooks/natural_language.py
b/airflow/providers/google/cloud/hooks/natural_language.py
index 326c4bf..b0b8b20 100644
--- a/airflow/providers/google/cloud/hooks/natural_language.py
+++ b/airflow/providers/google/cloud/hooks/natural_language.py
@@ -37,14 +37,32 @@ class CloudNaturalLanguageHook(GoogleBaseHook):
:param gcp_conn_id: The connection ID to use when fetching connection info.
:type gcp_conn_id: str
- :param delegate_to: The account to impersonate, if any.
- For this to work, the service account making the request must have
+ :param delegate_to: The account to impersonate using domain-wide
delegation of authority,
+ if any. For this to work, the service account making the request must
have
domain-wide delegation enabled.
:type delegate_to: str
+ :param impersonation_chain: Optional service account to impersonate using
short-term
+ credentials, or chained list of accounts required to get the
access_token
+ of the last account in the list, which will be impersonated in the
request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account.
+ :type impersonation_chain: Union[str, Sequence[str]]
"""
- def __init__(self, gcp_conn_id: str = "google_cloud_default", delegate_to:
Optional[str] = None) -> None:
- super().__init__(gcp_conn_id, delegate_to)
+ def __init__(
+ self,
+ gcp_conn_id: str = "google_cloud_default",
+ delegate_to: Optional[str] = None,
+ impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+ ) -> None:
+ super().__init__(
+ gcp_conn_id=gcp_conn_id,
+ delegate_to=delegate_to,
+ impersonation_chain=impersonation_chain,
+ )
self._conn = None
def get_conn(self) -> LanguageServiceClient:
diff --git a/airflow/providers/google/cloud/hooks/pubsub.py
b/airflow/providers/google/cloud/hooks/pubsub.py
index 7c8d6d3..026a116 100644
--- a/airflow/providers/google/cloud/hooks/pubsub.py
+++ b/airflow/providers/google/cloud/hooks/pubsub.py
@@ -50,8 +50,17 @@ class PubSubHook(GoogleBaseHook):
the project embedded in the Connection referenced by gcp_conn_id.
"""
- def __init__(self, gcp_conn_id: str = 'google_cloud_default', delegate_to:
Optional[str] = None) -> None:
- super().__init__(gcp_conn_id, delegate_to=delegate_to)
+ def __init__(
+ self,
+ gcp_conn_id: str = "google_cloud_default",
+ delegate_to: Optional[str] = None,
+ impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+ ) -> None:
+ super().__init__(
+ gcp_conn_id=gcp_conn_id,
+ delegate_to=delegate_to,
+ impersonation_chain=impersonation_chain,
+ )
self._client = None
def get_conn(self) -> PublisherClient:
diff --git a/airflow/providers/google/cloud/hooks/secret_manager.py
b/airflow/providers/google/cloud/hooks/secret_manager.py
index 3471dd4..a58cf62 100644
--- a/airflow/providers/google/cloud/hooks/secret_manager.py
+++ b/airflow/providers/google/cloud/hooks/secret_manager.py
@@ -16,7 +16,7 @@
# specific language governing permissions and limitations
# under the License.
"""Hook for Secrets Manager service"""
-from typing import Optional
+from typing import Optional, Sequence, Union
from airflow.providers.google.cloud._internal_client.secret_manager_client
import _SecretManagerClient # noqa
from airflow.providers.google.common.hooks.base_google import GoogleBaseHook
@@ -34,17 +34,31 @@ class SecretsManagerHook(GoogleBaseHook):
:param gcp_conn_id: The connection ID to use when fetching connection info.
:type gcp_conn_id: str
- :param delegate_to: The account to impersonate, if any.
- For this to work, the service account making the request must have
+ :param delegate_to: The account to impersonate using domain-wide
delegation of authority,
+ if any. For this to work, the service account making the request must
have
domain-wide delegation enabled.
:type delegate_to: str
+ :param impersonation_chain: Optional service account to impersonate using
short-term
+ credentials, or chained list of accounts required to get the
access_token
+ of the last account in the list, which will be impersonated in the
request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account.
+ :type impersonation_chain: Union[str, Sequence[str]]
"""
def __init__(
self,
gcp_conn_id: str = "google_cloud_default",
- delegate_to: Optional[str] = None
+ delegate_to: Optional[str] = None,
+ impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
) -> None:
- super().__init__(gcp_conn_id, delegate_to)
+ super().__init__(
+ gcp_conn_id=gcp_conn_id,
+ delegate_to=delegate_to,
+ impersonation_chain=impersonation_chain,
+ )
self.client = _SecretManagerClient(credentials=self._get_credentials())
def get_conn(self) -> _SecretManagerClient:
diff --git a/airflow/providers/google/cloud/hooks/spanner.py
b/airflow/providers/google/cloud/hooks/spanner.py
index 0a03d19..bba2c04 100644
--- a/airflow/providers/google/cloud/hooks/spanner.py
+++ b/airflow/providers/google/cloud/hooks/spanner.py
@@ -18,7 +18,7 @@
"""
This module contains a Google Cloud Spanner Hook.
"""
-from typing import Callable, List, Optional
+from typing import Callable, List, Optional, Sequence, Union
from google.api_core.exceptions import AlreadyExists, GoogleAPICallError
from google.cloud.spanner_v1.client import Client
@@ -39,8 +39,17 @@ class SpannerHook(GoogleBaseHook):
keyword arguments rather than positional.
"""
- def __init__(self, gcp_conn_id: str = 'google_cloud_default', delegate_to:
Optional[str] = None) -> None:
- super().__init__(gcp_conn_id, delegate_to)
+ def __init__(
+ self,
+ gcp_conn_id: str = "google_cloud_default",
+ delegate_to: Optional[str] = None,
+ impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+ ) -> None:
+ super().__init__(
+ gcp_conn_id=gcp_conn_id,
+ delegate_to=delegate_to,
+ impersonation_chain=impersonation_chain,
+ )
self._client = None
def _get_client(self, project_id: str) -> Client:
diff --git a/airflow/providers/google/cloud/hooks/speech_to_text.py
b/airflow/providers/google/cloud/hooks/speech_to_text.py
index 5cf6929..6cb943c 100644
--- a/airflow/providers/google/cloud/hooks/speech_to_text.py
+++ b/airflow/providers/google/cloud/hooks/speech_to_text.py
@@ -18,7 +18,7 @@
"""
This module contains a Google Cloud Speech Hook.
"""
-from typing import Dict, Optional, Union
+from typing import Dict, Optional, Sequence, Union
from google.api_core.retry import Retry
from google.cloud.speech_v1 import SpeechClient
@@ -33,14 +33,32 @@ class CloudSpeechToTextHook(GoogleBaseHook):
:param gcp_conn_id: The connection ID to use when fetching connection info.
:type gcp_conn_id: str
- :param delegate_to: The account to impersonate, if any.
- For this to work, the service account making the request must have
+ :param delegate_to: The account to impersonate using domain-wide
delegation of authority,
+ if any. For this to work, the service account making the request must
have
domain-wide delegation enabled.
:type delegate_to: str
+ :param impersonation_chain: Optional service account to impersonate using
short-term
+ credentials, or chained list of accounts required to get the
access_token
+ of the last account in the list, which will be impersonated in the
request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account.
+ :type impersonation_chain: Union[str, Sequence[str]]
"""
- def __init__(self, gcp_conn_id: str = "google_cloud_default", delegate_to:
Optional[str] = None) -> None:
- super().__init__(gcp_conn_id, delegate_to)
+ def __init__(
+ self,
+ gcp_conn_id: str = "google_cloud_default",
+ delegate_to: Optional[str] = None,
+ impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+ ) -> None:
+ super().__init__(
+ gcp_conn_id=gcp_conn_id,
+ delegate_to=delegate_to,
+ impersonation_chain=impersonation_chain,
+ )
self._client = None
def get_conn(self) -> SpeechClient:
diff --git a/airflow/providers/google/cloud/hooks/stackdriver.py
b/airflow/providers/google/cloud/hooks/stackdriver.py
index e0ebd7b..2aec897 100644
--- a/airflow/providers/google/cloud/hooks/stackdriver.py
+++ b/airflow/providers/google/cloud/hooks/stackdriver.py
@@ -21,7 +21,7 @@ This module contains GCP Stackdriver operators.
"""
import json
-from typing import Any, Dict, Optional
+from typing import Any, Dict, Optional, Sequence, Union
from google.api_core.exceptions import InvalidArgument
from google.api_core.gapic_v1.method import DEFAULT
@@ -38,8 +38,17 @@ class StackdriverHook(GoogleBaseHook):
Stackdriver Hook for connecting with GCP Stackdriver
"""
- def __init__(self, gcp_conn_id='google_cloud_default', delegate_to=None):
- super().__init__(gcp_conn_id, delegate_to)
+ def __init__(
+ self,
+ gcp_conn_id: str = "google_cloud_default",
+ delegate_to: Optional[str] = None,
+ impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+ ) -> None:
+ super().__init__(
+ gcp_conn_id=gcp_conn_id,
+ delegate_to=delegate_to,
+ impersonation_chain=impersonation_chain,
+ )
self._policy_client = None
self._channel_client = None
diff --git a/airflow/providers/google/cloud/hooks/tasks.py
b/airflow/providers/google/cloud/hooks/tasks.py
index f28b58a..e324012 100644
--- a/airflow/providers/google/cloud/hooks/tasks.py
+++ b/airflow/providers/google/cloud/hooks/tasks.py
@@ -41,14 +41,32 @@ class CloudTasksHook(GoogleBaseHook):
:param gcp_conn_id: The connection ID to use when fetching connection info.
:type gcp_conn_id: str
- :param delegate_to: The account to impersonate, if any.
- For this to work, the service account making the request must have
+ :param delegate_to: The account to impersonate using domain-wide
delegation of authority,
+ if any. For this to work, the service account making the request must
have
domain-wide delegation enabled.
:type delegate_to: str
+ :param impersonation_chain: Optional service account to impersonate using
short-term
+ credentials, or chained list of accounts required to get the
access_token
+ of the last account in the list, which will be impersonated in the
request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account.
+ :type impersonation_chain: Union[str, Sequence[str]]
"""
- def __init__(self, gcp_conn_id="google_cloud_default", delegate_to=None):
- super().__init__(gcp_conn_id, delegate_to)
+ def __init__(
+ self,
+ gcp_conn_id: str = "google_cloud_default",
+ delegate_to: Optional[str] = None,
+ impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+ ) -> None:
+ super().__init__(
+ gcp_conn_id=gcp_conn_id,
+ delegate_to=delegate_to,
+ impersonation_chain=impersonation_chain,
+ )
self._client = None
def get_conn(self):
diff --git a/airflow/providers/google/cloud/hooks/text_to_speech.py
b/airflow/providers/google/cloud/hooks/text_to_speech.py
index 52f3df7..7e16cd3 100644
--- a/airflow/providers/google/cloud/hooks/text_to_speech.py
+++ b/airflow/providers/google/cloud/hooks/text_to_speech.py
@@ -18,7 +18,7 @@
"""
This module contains a Google Cloud Text to Speech Hook.
"""
-from typing import Dict, Optional, Union
+from typing import Dict, Optional, Sequence, Union
from google.api_core.retry import Retry
from google.cloud.texttospeech_v1 import TextToSpeechClient
@@ -38,14 +38,32 @@ class CloudTextToSpeechHook(GoogleBaseHook):
:param gcp_conn_id: The connection ID to use when fetching connection info.
:type gcp_conn_id: str
- :param delegate_to: The account to impersonate, if any.
- For this to work, the service account making the request must have
+ :param delegate_to: The account to impersonate using domain-wide
delegation of authority,
+ if any. For this to work, the service account making the request must
have
domain-wide delegation enabled.
:type delegate_to: str
+ :param impersonation_chain: Optional service account to impersonate using
short-term
+ credentials, or chained list of accounts required to get the
access_token
+ of the last account in the list, which will be impersonated in the
request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account.
+ :type impersonation_chain: Union[str, Sequence[str]]
"""
- def __init__(self, gcp_conn_id: str = "google_cloud_default", delegate_to:
Optional[str] = None) -> None:
- super().__init__(gcp_conn_id, delegate_to)
+ def __init__(
+ self,
+ gcp_conn_id: str = "google_cloud_default",
+ delegate_to: Optional[str] = None,
+ impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+ ) -> None:
+ super().__init__(
+ gcp_conn_id=gcp_conn_id,
+ delegate_to=delegate_to,
+ impersonation_chain=impersonation_chain,
+ )
self._client = None # type: Optional[TextToSpeechClient]
def get_conn(self) -> TextToSpeechClient:
diff --git a/airflow/providers/google/cloud/hooks/translate.py
b/airflow/providers/google/cloud/hooks/translate.py
index c6ee3e7..c3fd653 100644
--- a/airflow/providers/google/cloud/hooks/translate.py
+++ b/airflow/providers/google/cloud/hooks/translate.py
@@ -18,7 +18,7 @@
"""
This module contains a Google Cloud Translate Hook.
"""
-from typing import Dict, List, Optional, Union
+from typing import Dict, List, Optional, Sequence, Union
from google.cloud.translate_v2 import Client
@@ -33,8 +33,17 @@ class CloudTranslateHook(GoogleBaseHook):
keyword arguments rather than positional.
"""
- def __init__(self, gcp_conn_id: str = 'google_cloud_default') -> None:
- super().__init__(gcp_conn_id)
+ def __init__(
+ self,
+ gcp_conn_id: str = "google_cloud_default",
+ delegate_to: Optional[str] = None,
+ impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+ ) -> None:
+ super().__init__(
+ gcp_conn_id=gcp_conn_id,
+ delegate_to=delegate_to,
+ impersonation_chain=impersonation_chain,
+ )
self._client = None # type: Optional[Client]
def get_conn(self) -> Client:
diff --git a/airflow/providers/google/cloud/hooks/video_intelligence.py
b/airflow/providers/google/cloud/hooks/video_intelligence.py
index 215a29a..a1f1c3f 100644
--- a/airflow/providers/google/cloud/hooks/video_intelligence.py
+++ b/airflow/providers/google/cloud/hooks/video_intelligence.py
@@ -37,14 +37,32 @@ class CloudVideoIntelligenceHook(GoogleBaseHook):
:param gcp_conn_id: The connection ID to use when fetching connection info.
:type gcp_conn_id: str
- :param delegate_to: The account to impersonate, if any.
- For this to work, the service account making the request must have
+ :param delegate_to: The account to impersonate using domain-wide
delegation of authority,
+ if any. For this to work, the service account making the request must
have
domain-wide delegation enabled.
:type delegate_to: str
+ :param impersonation_chain: Optional service account to impersonate using
short-term
+ credentials, or chained list of accounts required to get the
access_token
+ of the last account in the list, which will be impersonated in the
request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account.
+ :type impersonation_chain: Union[str, Sequence[str]]
"""
- def __init__(self, gcp_conn_id: str = "google_cloud_default", delegate_to:
Optional[str] = None) -> None:
- super().__init__(gcp_conn_id, delegate_to)
+ def __init__(
+ self,
+ gcp_conn_id: str = "google_cloud_default",
+ delegate_to: Optional[str] = None,
+ impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+ ) -> None:
+ super().__init__(
+ gcp_conn_id=gcp_conn_id,
+ delegate_to=delegate_to,
+ impersonation_chain=impersonation_chain,
+ )
self._conn = None
def get_conn(self) -> VideoIntelligenceServiceClient:
diff --git a/airflow/providers/google/cloud/hooks/vision.py
b/airflow/providers/google/cloud/hooks/vision.py
index b858a92..84c3df5 100644
--- a/airflow/providers/google/cloud/hooks/vision.py
+++ b/airflow/providers/google/cloud/hooks/vision.py
@@ -129,8 +129,17 @@ class CloudVisionHook(GoogleBaseHook):
'ProductSet', 'productset_id', ProductSearchClient.product_set_path
)
- def __init__(self, gcp_conn_id: str = 'google_cloud_default', delegate_to:
Optional[str] = None) -> None:
- super().__init__(gcp_conn_id, delegate_to)
+ def __init__(
+ self,
+ gcp_conn_id: str = "google_cloud_default",
+ delegate_to: Optional[str] = None,
+ impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+ ) -> None:
+ super().__init__(
+ gcp_conn_id=gcp_conn_id,
+ delegate_to=delegate_to,
+ impersonation_chain=impersonation_chain,
+ )
self._client = None
def get_conn(self) -> ProductSearchClient:
diff --git a/airflow/providers/google/cloud/utils/credentials_provider.py
b/airflow/providers/google/cloud/utils/credentials_provider.py
index 76234f8..93cb9e9 100644
--- a/airflow/providers/google/cloud/utils/credentials_provider.py
+++ b/airflow/providers/google/cloud/utils/credentials_provider.py
@@ -23,12 +23,13 @@ import json
import logging
import tempfile
from contextlib import ExitStack, contextmanager
-from typing import Collection, Dict, Optional, Sequence, Tuple
+from typing import Collection, Dict, Optional, Sequence, Tuple, Union
from urllib.parse import urlencode
import google.auth
import google.auth.credentials
import google.oauth2.service_account
+from google.auth import impersonated_credentials
from google.auth.environment_vars import CREDENTIALS, LEGACY_PROJECT, PROJECT
from airflow.exceptions import AirflowException
@@ -191,12 +192,22 @@ class _CredentialProvider(LoggingMixin):
:type keyfile_dict: Dict[str, str]
:param scopes: OAuth scopes for the connection
:type scopes: Collection[str]
- :param delegate_to: The account to impersonate, if any.
- For this to work, the service account making the request must have
+ :param delegate_to: The account to impersonate using domain-wide
delegation of authority,
+ if any. For this to work, the service account making the request must
have
domain-wide delegation enabled.
:type delegate_to: str
:param disable_logging: If true, disable all log messages, which allows
you to use this
class to configure Logger.
+ :param target_principal: The service account to directly impersonate using
short-term
+ credentials, if any. For this to work, the target_principal account
must grant
+ the originating account the Service Account Token Creator IAM role.
+ :type target_principal: str
+ :param delegates: optional chained list of accounts required to get the
access_token of
+ target_principal. If set, the sequence of identities from the list
must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account
and target_principal
+ granting the role to the last account from the list.
+ :type delegates: Sequence[str]
"""
def __init__(
self,
@@ -205,7 +216,9 @@ class _CredentialProvider(LoggingMixin):
# See: https://github.com/PyCQA/pylint/issues/2377
scopes: Optional[Collection[str]] = None, # pylint:
disable=unsubscriptable-object
delegate_to: Optional[str] = None,
- disable_logging: bool = False
+ disable_logging: bool = False,
+ target_principal: Optional[str] = None,
+ delegates: Optional[Sequence[str]] = None,
):
super().__init__()
if key_path and keyfile_dict:
@@ -218,6 +231,8 @@ class _CredentialProvider(LoggingMixin):
self.scopes = scopes
self.delegate_to = delegate_to
self.disable_logging = disable_logging
+ self.target_principal = target_principal
+ self.delegates = delegates
def get_credentials_and_project(self):
"""
@@ -243,6 +258,14 @@ class _CredentialProvider(LoggingMixin):
"Please use service-account for authorization."
)
+ if self.target_principal:
+ credentials = impersonated_credentials.Credentials(
+ source_credentials=credentials,
+ target_principal=self.target_principal,
+ delegates=self.delegates,
+ target_scopes=self.scopes
+ )
+
return credentials, project_id
def _get_credentials_using_keyfile_dict(self):
@@ -311,3 +334,27 @@ def _get_scopes(scopes: Optional[str] = None) ->
Sequence[str]:
"""
return [s.strip() for s in scopes.split(',')] \
if scopes else _DEFAULT_SCOPES
+
+
+def _get_target_principal_and_delegates(
+ impersonation_chain: Optional[Union[str, Sequence[str]]] = None
+) -> Tuple[Optional[str], Optional[Sequence[str]]]:
+ """
+ Analyze contents of impersonation_chain and return target_principal (the
service account
+ to directly impersonate using short-term credentials, if any) and optional
list of delegates
+ required to get the access_token of target_principal.
+
+ :param impersonation_chain: the service account to impersonate or a
chained list leading to this
+ account
+ :type impersonation_chain: Optional[Union[str, Sequence[str]]]
+
+ :return: Returns the tuple of target_principal and delegates
+ :rtype: Tuple[Optional[str], Optional[Sequence[str]]]
+ """
+ if not impersonation_chain:
+ return None, None
+
+ if isinstance(impersonation_chain, str):
+ return impersonation_chain, None
+
+ return impersonation_chain[-1], impersonation_chain[:-1]
diff --git a/airflow/providers/google/common/hooks/base_google.py
b/airflow/providers/google/common/hooks/base_google.py
index 4e942a7..62e22e1 100644
--- a/airflow/providers/google/common/hooks/base_google.py
+++ b/airflow/providers/google/common/hooks/base_google.py
@@ -26,7 +26,7 @@ import os
import tempfile
from contextlib import contextmanager
from subprocess import check_output
-from typing import Any, Callable, Dict, Optional, Sequence, Tuple, TypeVar,
cast
+from typing import Any, Callable, Dict, Optional, Sequence, Tuple, TypeVar,
Union, cast
import google.auth
import google.auth.credentials
@@ -44,7 +44,7 @@ from airflow import version
from airflow.exceptions import AirflowException
from airflow.hooks.base_hook import BaseHook
from airflow.providers.google.cloud.utils.credentials_provider import (
- _get_scopes, get_credentials_and_project_id,
+ _get_scopes, _get_target_principal_and_delegates,
get_credentials_and_project_id,
)
from airflow.utils.process_utils import patch_environ
@@ -148,16 +148,31 @@ class GoogleBaseHook(BaseHook):
:param gcp_conn_id: The connection ID to use when fetching connection info.
:type gcp_conn_id: str
- :param delegate_to: The account to impersonate, if any.
- For this to work, the service account making the request must have
+ :param delegate_to: The account to impersonate using domain-wide
delegation of authority,
+ if any. For this to work, the service account making the request must
have
domain-wide delegation enabled.
:type delegate_to: str
+ :param impersonation_chain: Optional service account to impersonate using
short-term
+ credentials, or chained list of accounts required to get the
access_token
+ of the last account in the list, which will be impersonated in the
request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account.
+ :type impersonation_chain: Union[str, Sequence[str]]
"""
- def __init__(self, gcp_conn_id: str = 'google_cloud_default', delegate_to:
Optional[str] = None) -> None:
+ def __init__(
+ self,
+ gcp_conn_id: str = 'google_cloud_default',
+ delegate_to: Optional[str] = None,
+ impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+ ) -> None:
super().__init__()
self.gcp_conn_id = gcp_conn_id
self.delegate_to = delegate_to
+ self.impersonation_chain = impersonation_chain
self.extras = self.get_connection(self.gcp_conn_id).extra_dejson #
type: Dict
self._cached_credentials:
Optional[google.auth.credentials.Credentials] = None
self._cached_project_id: Optional[str] = None
@@ -178,11 +193,15 @@ class GoogleBaseHook(BaseHook):
except json.decoder.JSONDecodeError:
raise AirflowException('Invalid key JSON.')
+ target_principal, delegates =
_get_target_principal_and_delegates(self.impersonation_chain)
+
credentials, project_id = get_credentials_and_project_id(
key_path=key_path,
keyfile_dict=keyfile_dict_json,
scopes=self.scopes,
- delegate_to=self.delegate_to
+ delegate_to=self.delegate_to,
+ target_principal=target_principal,
+ delegates=delegates,
)
overridden_project_id = self._get_field('project')
diff --git a/airflow/providers/google/common/hooks/discovery_api.py
b/airflow/providers/google/common/hooks/discovery_api.py
index 22adbb0..b001b54 100644
--- a/airflow/providers/google/common/hooks/discovery_api.py
+++ b/airflow/providers/google/common/hooks/discovery_api.py
@@ -19,7 +19,7 @@
"""
This module allows you to connect to the Google Discovery API Service and
query it.
"""
-from typing import Dict, Optional
+from typing import Dict, Optional, Sequence, Union
from googleapiclient.discovery import Resource, build
@@ -37,10 +37,19 @@ class GoogleDiscoveryApiHook(GoogleBaseHook):
:type api_version: str
:param gcp_conn_id: The connection ID to use when fetching connection info.
:type gcp_conn_id: str
- :param delegate_to: The account to impersonate, if any.
- For this to work, the service account making the request must have
+ :param delegate_to: The account to impersonate using domain-wide
delegation of authority,
+ if any. For this to work, the service account making the request must
have
domain-wide delegation enabled.
:type delegate_to: str
+ :param impersonation_chain: Optional service account to impersonate using
short-term
+ credentials, or chained list of accounts required to get the
access_token
+ of the last account in the list, which will be impersonated in the
request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account.
+ :type impersonation_chain: Union[str, Sequence[str]]
"""
_conn = None # type: Optional[Resource]
@@ -48,10 +57,15 @@ class GoogleDiscoveryApiHook(GoogleBaseHook):
self,
api_service_name: str,
api_version: str,
- gcp_conn_id='google_cloud_default',
- delegate_to: Optional[str] = None
+ gcp_conn_id: str = "google_cloud_default",
+ delegate_to: Optional[str] = None,
+ impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
) -> None:
- super().__init__(gcp_conn_id=gcp_conn_id, delegate_to=delegate_to)
+ super().__init__(
+ gcp_conn_id=gcp_conn_id,
+ delegate_to=delegate_to,
+ impersonation_chain=impersonation_chain,
+ )
self.api_service_name = api_service_name
self.api_version = api_version
diff --git a/airflow/providers/google/firebase/hooks/firestore.py
b/airflow/providers/google/firebase/hooks/firestore.py
index 162ca4d..2da903d 100644
--- a/airflow/providers/google/firebase/hooks/firestore.py
+++ b/airflow/providers/google/firebase/hooks/firestore.py
@@ -18,7 +18,7 @@
"""Hook for Google Cloud Firestore service"""
import time
-from typing import Any, Dict, Optional
+from typing import Any, Dict, Optional, Sequence, Union
from googleapiclient.discovery import build, build_from_document
@@ -41,10 +41,19 @@ class CloudFirestoreHook(GoogleBaseHook):
:type api_version: str
:param gcp_conn_id: The connection ID to use when fetching connection info.
:type gcp_conn_id: str
- :param delegate_to: The account to impersonate, if any.
- For this to work, the service account making the request must have
+ :param delegate_to: The account to impersonate using domain-wide
delegation of authority,
+ if any. For this to work, the service account making the request must
have
domain-wide delegation enabled.
:type delegate_to: str
+ :param impersonation_chain: Optional service account to impersonate using
short-term
+ credentials, or chained list of accounts required to get the
access_token
+ of the last account in the list, which will be impersonated in the
request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account.
+ :type impersonation_chain: Union[str, Sequence[str]]
"""
_conn = None # type: Optional[Any]
@@ -54,8 +63,13 @@ class CloudFirestoreHook(GoogleBaseHook):
api_version: str = "v1",
gcp_conn_id: str = "google_cloud_default",
delegate_to: Optional[str] = None,
+ impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
) -> None:
- super().__init__(gcp_conn_id, delegate_to)
+ super().__init__(
+ gcp_conn_id=gcp_conn_id,
+ delegate_to=delegate_to,
+ impersonation_chain=impersonation_chain,
+ )
self.api_version = api_version
def get_conn(self):
diff --git
a/airflow/providers/google/marketing_platform/hooks/campaign_manager.py
b/airflow/providers/google/marketing_platform/hooks/campaign_manager.py
index 77c1f56..5c8ccdd 100644
--- a/airflow/providers/google/marketing_platform/hooks/campaign_manager.py
+++ b/airflow/providers/google/marketing_platform/hooks/campaign_manager.py
@@ -18,7 +18,7 @@
"""
This module contains Google Campaign Manager hook.
"""
-from typing import Any, Dict, List, Optional
+from typing import Any, Dict, List, Optional, Sequence, Union
from googleapiclient import http
from googleapiclient.discovery import Resource, build
@@ -39,8 +39,13 @@ class GoogleCampaignManagerHook(GoogleBaseHook):
api_version: str = "v3.3",
gcp_conn_id: str = "google_cloud_default",
delegate_to: Optional[str] = None,
+ impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
) -> None:
- super().__init__(gcp_conn_id, delegate_to)
+ super().__init__(
+ gcp_conn_id=gcp_conn_id,
+ delegate_to=delegate_to,
+ impersonation_chain=impersonation_chain,
+ )
self.api_version = api_version
def get_conn(self) -> Resource:
diff --git a/airflow/providers/google/marketing_platform/hooks/display_video.py
b/airflow/providers/google/marketing_platform/hooks/display_video.py
index c2c77ee..c3c7364 100644
--- a/airflow/providers/google/marketing_platform/hooks/display_video.py
+++ b/airflow/providers/google/marketing_platform/hooks/display_video.py
@@ -19,7 +19,7 @@
This module contains Google DisplayVideo hook.
"""
-from typing import Any, Dict, List, Optional
+from typing import Any, Dict, List, Optional, Sequence, Union
from googleapiclient.discovery import Resource, build
@@ -38,8 +38,13 @@ class GoogleDisplayVideo360Hook(GoogleBaseHook):
api_version: str = "v1",
gcp_conn_id: str = "google_cloud_default",
delegate_to: Optional[str] = None,
+ impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
) -> None:
- super().__init__(gcp_conn_id, delegate_to)
+ super().__init__(
+ gcp_conn_id=gcp_conn_id,
+ delegate_to=delegate_to,
+ impersonation_chain=impersonation_chain,
+ )
self.api_version = api_version
def get_conn(self) -> Resource:
diff --git a/airflow/providers/google/marketing_platform/hooks/search_ads.py
b/airflow/providers/google/marketing_platform/hooks/search_ads.py
index 2264cc1..f6342aa 100644
--- a/airflow/providers/google/marketing_platform/hooks/search_ads.py
+++ b/airflow/providers/google/marketing_platform/hooks/search_ads.py
@@ -18,7 +18,7 @@
"""
This module contains Google Search Ads 360 hook.
"""
-from typing import Any, Dict, Optional
+from typing import Any, Dict, Optional, Sequence, Union
from googleapiclient.discovery import build
@@ -37,8 +37,13 @@ class GoogleSearchAdsHook(GoogleBaseHook):
api_version: str = "v2",
gcp_conn_id: str = "google_cloud_default",
delegate_to: Optional[str] = None,
+ impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
) -> None:
- super().__init__(gcp_conn_id, delegate_to)
+ super().__init__(
+ gcp_conn_id=gcp_conn_id,
+ delegate_to=delegate_to,
+ impersonation_chain=impersonation_chain,
+ )
self.api_version = api_version
def get_conn(self):
diff --git a/airflow/providers/google/suite/hooks/drive.py
b/airflow/providers/google/suite/hooks/drive.py
index bac3557..16613fb 100644
--- a/airflow/providers/google/suite/hooks/drive.py
+++ b/airflow/providers/google/suite/hooks/drive.py
@@ -16,7 +16,7 @@
# specific language governing permissions and limitations
# under the License.
"""Hook for Google Drive service"""
-from typing import Any, Optional
+from typing import Any, Optional, Sequence, Union
from googleapiclient.discovery import Resource, build
from googleapiclient.http import MediaFileUpload
@@ -33,10 +33,19 @@ class GoogleDriveHook(GoogleBaseHook):
:type api_version: str
:param gcp_conn_id: The connection ID to use when fetching connection info.
:type gcp_conn_id: str
- :param delegate_to: The account to impersonate, if any.
- For this to work, the service account making the request must have
+ :param delegate_to: The account to impersonate using domain-wide
delegation of authority,
+ if any. For this to work, the service account making the request must
have
domain-wide delegation enabled.
:type delegate_to: str
+ :param impersonation_chain: Optional service account to impersonate using
short-term
+ credentials, or chained list of accounts required to get the
access_token
+ of the last account in the list, which will be impersonated in the
request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account.
+ :type impersonation_chain: Union[str, Sequence[str]]
"""
_conn = None # type: Optional[Resource]
@@ -45,9 +54,14 @@ class GoogleDriveHook(GoogleBaseHook):
self,
api_version: str = "v3",
gcp_conn_id: str = "google_cloud_default",
- delegate_to: Optional[str] = None
+ delegate_to: Optional[str] = None,
+ impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
) -> None:
- super().__init__(gcp_conn_id, delegate_to)
+ super().__init__(
+ gcp_conn_id=gcp_conn_id,
+ delegate_to=delegate_to,
+ impersonation_chain=impersonation_chain,
+ )
self.api_version = api_version
def get_conn(self) -> Any:
diff --git a/airflow/providers/google/suite/hooks/sheets.py
b/airflow/providers/google/suite/hooks/sheets.py
index 71e5b4e..2f973b3 100644
--- a/airflow/providers/google/suite/hooks/sheets.py
+++ b/airflow/providers/google/suite/hooks/sheets.py
@@ -20,7 +20,7 @@
This module contains a Google Sheets API hook
"""
-from typing import Any, Dict, List, Optional
+from typing import Any, Dict, List, Optional, Sequence, Union
from googleapiclient.discovery import build
@@ -38,19 +38,33 @@ class GSheetsHook(GoogleBaseHook):
:type gcp_conn_id: str
:param api_version: API Version
:type api_version: str
- :param delegate_to: The account to impersonate, if any.
- For this to work, the service account making the request must have
+ :param delegate_to: The account to impersonate using domain-wide
delegation of authority,
+ if any. For this to work, the service account making the request must
have
domain-wide delegation enabled.
:type delegate_to: str
+ :param impersonation_chain: Optional service account to impersonate using
short-term
+ credentials, or chained list of accounts required to get the
access_token
+ of the last account in the list, which will be impersonated in the
request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account.
+ :type impersonation_chain: Union[str, Sequence[str]]
"""
def __init__(
self,
gcp_conn_id: str = 'google_cloud_default',
api_version: str = 'v4',
- delegate_to: Optional[str] = None
+ delegate_to: Optional[str] = None,
+ impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
) -> None:
- super().__init__(gcp_conn_id, delegate_to)
+ super().__init__(
+ gcp_conn_id=gcp_conn_id,
+ delegate_to=delegate_to,
+ impersonation_chain=impersonation_chain,
+ )
self.gcp_conn_id = gcp_conn_id
self.api_version = api_version
self.delegate_to = delegate_to
diff --git a/tests/providers/google/cloud/hooks/test_bigquery.py
b/tests/providers/google/cloud/hooks/test_bigquery.py
index 8286a8e..a9f5472 100644
--- a/tests/providers/google/cloud/hooks/test_bigquery.py
+++ b/tests/providers/google/cloud/hooks/test_bigquery.py
@@ -84,7 +84,11 @@ class TestBigQueryHookMethods(_BigQueryBaseTestClass):
"You should pass the gcp_conn_id parameter."
with self.assertWarns(DeprecationWarning) as warn:
BigQueryHook(bigquery_conn_id=bigquery_conn_id)
- mock_base_hook_init.assert_called_once_with(delegate_to=None,
gcp_conn_id='bigquery conn id')
+ mock_base_hook_init.assert_called_once_with(
+ delegate_to=None,
+ gcp_conn_id='bigquery conn id',
+ impersonation_chain=None,
+ )
self.assertEqual(warning_message, str(warn.warning))
@mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.get_service")
diff --git a/tests/providers/google/cloud/hooks/test_dataflow.py
b/tests/providers/google/cloud/hooks/test_dataflow.py
index d3568f3..0b2ad7d 100644
--- a/tests/providers/google/cloud/hooks/test_dataflow.py
+++ b/tests/providers/google/cloud/hooks/test_dataflow.py
@@ -142,7 +142,12 @@ class TestFallbackToVariables(unittest.TestCase):
FixutureFallback().test_fn({'project': "TEST"}, "TEST2")
-def mock_init(self, gcp_conn_id, delegate_to=None): # pylint:
disable=unused-argument
+def mock_init(
+ self,
+ gcp_conn_id,
+ delegate_to=None,
+ impersonation_chain=None,
+): # pylint: disable=unused-argument
pass
diff --git a/tests/providers/google/cloud/hooks/test_datastore.py
b/tests/providers/google/cloud/hooks/test_datastore.py
index b1a0cde..93dd663 100644
--- a/tests/providers/google/cloud/hooks/test_datastore.py
+++ b/tests/providers/google/cloud/hooks/test_datastore.py
@@ -28,7 +28,12 @@ from airflow.providers.google.cloud.hooks.datastore import
DatastoreHook
GCP_PROJECT_ID = "test"
-def mock_init(self, gcp_conn_id, delegate_to=None): # pylint:
disable=unused-argument
+def mock_init(
+ self,
+ gcp_conn_id,
+ delegate_to=None,
+ impersonation_chain=None,
+): # pylint: disable=unused-argument
pass
diff --git a/tests/providers/google/cloud/hooks/test_gdm.py
b/tests/providers/google/cloud/hooks/test_gdm.py
index e11f9a4..67e00e2 100644
--- a/tests/providers/google/cloud/hooks/test_gdm.py
+++ b/tests/providers/google/cloud/hooks/test_gdm.py
@@ -23,7 +23,12 @@ from airflow.exceptions import AirflowException
from airflow.providers.google.cloud.hooks.gdm import
GoogleDeploymentManagerHook
-def mock_init(self, gcp_conn_id, delegate_to=None): # pylint:
disable=unused-argument
+def mock_init(
+ self,
+ gcp_conn_id,
+ delegate_to=None,
+ impersonation_chain=None,
+): # pylint: disable=unused-argument
pass
diff --git a/tests/providers/google/cloud/hooks/test_kms.py
b/tests/providers/google/cloud/hooks/test_kms.py
index 525839d..a1b1db2 100644
--- a/tests/providers/google/cloud/hooks/test_kms.py
+++ b/tests/providers/google/cloud/hooks/test_kms.py
@@ -45,7 +45,12 @@ TEST_KEY_ID =
"projects/{}/locations/{}/keyRings/{}/cryptoKeys/{}".format(
RESPONSE = Response(PLAINTEXT, PLAINTEXT)
-def mock_init(self, gcp_conn_id, delegate_to=None): # pylint:
disable=unused-argument
+def mock_init(
+ self,
+ gcp_conn_id,
+ delegate_to=None,
+ impersonation_chain=None,
+): # pylint: disable=unused-argument
pass
diff --git a/tests/providers/google/cloud/hooks/test_pubsub.py
b/tests/providers/google/cloud/hooks/test_pubsub.py
index 0d3b992..f208781 100644
--- a/tests/providers/google/cloud/hooks/test_pubsub.py
+++ b/tests/providers/google/cloud/hooks/test_pubsub.py
@@ -51,7 +51,12 @@ EXPANDED_SUBSCRIPTION =
'projects/{}/subscriptions/{}'.format(TEST_PROJECT, TEST
LABELS = {'airflow-version': 'v' + version.replace('.', '-').replace('+', '-')}
-def mock_init(self, gcp_conn_id, delegate_to=None): # pylint:
disable=unused-argument
+def mock_init(
+ self,
+ gcp_conn_id,
+ delegate_to=None,
+ impersonation_chain=None,
+): # pylint: disable=unused-argument
pass
diff --git a/tests/providers/google/cloud/utils/base_gcp_mock.py
b/tests/providers/google/cloud/utils/base_gcp_mock.py
index 04b796c..d20696b 100644
--- a/tests/providers/google/cloud/utils/base_gcp_mock.py
+++ b/tests/providers/google/cloud/utils/base_gcp_mock.py
@@ -23,22 +23,34 @@ from airflow.models import Connection
GCP_PROJECT_ID_HOOK_UNIT_TEST = 'example-project'
-def mock_base_gcp_hook_default_project_id(self,
gcp_conn_id='google_cloud_default', delegate_to=None):
+def mock_base_gcp_hook_default_project_id(
+ self,
+ gcp_conn_id='google_cloud_default',
+ delegate_to=None,
+ impersonation_chain=None,
+):
self.extras = {
'extra__google_cloud_platform__project': GCP_PROJECT_ID_HOOK_UNIT_TEST
}
self._conn = gcp_conn_id
self.delegate_to = delegate_to
+ self.impersonation_chain = impersonation_chain
self._client = None
self._conn = None
self._cached_credentials = None
self._cached_project_id = None
-def mock_base_gcp_hook_no_default_project_id(self,
gcp_conn_id='google_cloud_default', delegate_to=None):
+def mock_base_gcp_hook_no_default_project_id(
+ self,
+ gcp_conn_id='google_cloud_default',
+ delegate_to=None,
+ impersonation_chain=None,
+):
self.extras = {}
self._conn = gcp_conn_id
self.delegate_to = delegate_to
+ self.impersonation_chain = impersonation_chain
self._client = None
self._conn = None
self._cached_credentials = None
diff --git a/tests/providers/google/cloud/utils/test_credentials_provider.py
b/tests/providers/google/cloud/utils/test_credentials_provider.py
index 0827de1..5066bb5 100644
--- a/tests/providers/google/cloud/utils/test_credentials_provider.py
+++ b/tests/providers/google/cloud/utils/test_credentials_provider.py
@@ -28,8 +28,8 @@ from parameterized import parameterized
from airflow.exceptions import AirflowException
from airflow.providers.google.cloud.utils.credentials_provider import (
- _DEFAULT_SCOPES, AIRFLOW_CONN_GOOGLE_CLOUD_DEFAULT, _get_scopes,
build_gcp_conn,
- get_credentials_and_project_id, provide_gcp_conn_and_credentials,
provide_gcp_connection,
+ _DEFAULT_SCOPES, AIRFLOW_CONN_GOOGLE_CLOUD_DEFAULT, _get_scopes,
_get_target_principal_and_delegates,
+ build_gcp_conn, get_credentials_and_project_id,
provide_gcp_conn_and_credentials, provide_gcp_connection,
provide_gcp_credentials,
)
@@ -171,6 +171,69 @@ class TestGetGcpCredentialsAndProjectId(unittest.TestCase):
mock_auth_default.assert_called_once_with(scopes=scopes)
self.assertEqual(mock_auth_default.return_value, result)
+ @mock.patch('airflow.providers.google.cloud.utils.credentials_provider.'
+ 'impersonated_credentials.Credentials')
+ @mock.patch('google.auth.default')
+ def
test_get_credentials_and_project_id_with_default_auth_and_target_principal(
+ self, mock_auth_default, mock_impersonated_credentials
+ ):
+ mock_credentials = mock.MagicMock()
+ mock_auth_default.return_value = (mock_credentials,
self.test_project_id)
+
+ result = get_credentials_and_project_id(target_principal="ACCOUNT_1")
+ mock_auth_default.assert_called_once_with(scopes=None)
+ mock_impersonated_credentials.assert_called_once_with(
+ source_credentials=mock_credentials,
+ target_principal="ACCOUNT_1",
+ delegates=None,
+ target_scopes=None,
+ )
+ self.assertEqual((mock_impersonated_credentials.return_value,
self.test_project_id), result)
+
+ @mock.patch('airflow.providers.google.cloud.utils.credentials_provider.'
+ 'impersonated_credentials.Credentials')
+ @mock.patch('google.auth.default')
+ def
test_get_credentials_and_project_id_with_default_auth_and_scopes_and_target_principal(
+ self, mock_auth_default, mock_impersonated_credentials
+ ):
+ mock_credentials = mock.MagicMock()
+ mock_auth_default.return_value = (mock_credentials,
self.test_project_id)
+
+ result = get_credentials_and_project_id(
+ scopes=['scope1', 'scope2'],
+ target_principal="ACCOUNT_1",
+ )
+ mock_auth_default.assert_called_once_with(scopes=['scope1', 'scope2'])
+ mock_impersonated_credentials.assert_called_once_with(
+ source_credentials=mock_credentials,
+ target_principal="ACCOUNT_1",
+ delegates=None,
+ target_scopes=['scope1', 'scope2'],
+ )
+ self.assertEqual((mock_impersonated_credentials.return_value,
self.test_project_id), result)
+
+ @mock.patch('airflow.providers.google.cloud.utils.credentials_provider.'
+ 'impersonated_credentials.Credentials')
+ @mock.patch('google.auth.default')
+ def
test_get_credentials_and_project_id_with_default_auth_and_target_principal_and_delegates(
+ self, mock_auth_default, mock_impersonated_credentials
+ ):
+ mock_credentials = mock.MagicMock()
+ mock_auth_default.return_value = (mock_credentials,
self.test_project_id)
+
+ result = get_credentials_and_project_id(
+ target_principal="ACCOUNT_3",
+ delegates=["ACCOUNT_1", "ACCOUNT_2"],
+ )
+ mock_auth_default.assert_called_once_with(scopes=None)
+ mock_impersonated_credentials.assert_called_once_with(
+ source_credentials=mock_credentials,
+ target_principal="ACCOUNT_3",
+ delegates=["ACCOUNT_1", "ACCOUNT_2"],
+ target_scopes=None,
+ )
+ self.assertEqual((mock_impersonated_credentials.return_value,
self.test_project_id), result)
+
@mock.patch(
'google.oauth2.service_account.Credentials.from_service_account_file',
)
@@ -256,3 +319,24 @@ class TestGetScopes(unittest.TestCase):
])
def test_get_scopes_with_input(self, _, scopes_str, scopes):
self.assertEqual(_get_scopes(scopes_str), scopes)
+
+
+class TestGetTargetPrincipalAndDelegates(unittest.TestCase):
+
+ def test_get_target_principal_and_delegates_no_argument(self):
+ self.assertEqual(_get_target_principal_and_delegates(), (None, None))
+
+ @parameterized.expand([
+ ('string', "ACCOUNT_1", ("ACCOUNT_1", None)),
+ ('empty_list', [], (None, None)),
+ ('single_element_list', ["ACCOUNT_1"], ("ACCOUNT_1", [])),
+ ('multiple_elements_list',
+ ["ACCOUNT_1", "ACCOUNT_2", "ACCOUNT_3"], ("ACCOUNT_3", ["ACCOUNT_1",
"ACCOUNT_2"])),
+ ])
+ def test_get_target_principal_and_delegates_with_input(
+ self, _, impersonation_chain, target_principal_and_delegates
+ ):
+ self.assertEqual(
+ _get_target_principal_and_delegates(impersonation_chain),
+ target_principal_and_delegates
+ )
diff --git a/tests/providers/google/common/hooks/test_base_google.py
b/tests/providers/google/common/hooks/test_base_google.py
index 4f1e91d..5be2e69 100644
--- a/tests/providers/google/common/hooks/test_base_google.py
+++ b/tests/providers/google/common/hooks/test_base_google.py
@@ -28,6 +28,7 @@ import tenacity
from google.auth.environment_vars import CREDENTIALS
from google.auth.exceptions import GoogleAuthError
from google.cloud.exceptions import Forbidden
+from parameterized import parameterized
from airflow import version
from airflow.exceptions import AirflowException
@@ -43,6 +44,7 @@ except GoogleAuthError:
default_creds_available = False
MODULE_NAME = "airflow.providers.google.common.hooks.base_google"
+PROJECT_ID = "PROJECT_ID"
class NoForbiddenAfterCount:
@@ -341,7 +343,10 @@ class TestGoogleBaseHook(unittest.TestCase):
key_path=None,
keyfile_dict=None,
scopes=self.instance.scopes,
- delegate_to=None)
+ delegate_to=None,
+ target_principal=None,
+ delegates=None,
+ )
self.assertEqual(('CREDENTIALS', 'PROJECT_ID'), result)
@mock.patch(MODULE_NAME + '.get_credentials_and_project_id')
@@ -359,7 +364,9 @@ class TestGoogleBaseHook(unittest.TestCase):
key_path='KEY_PATH.json',
keyfile_dict=None,
scopes=self.instance.scopes,
- delegate_to=None
+ delegate_to=None,
+ target_principal=None,
+ delegates=None,
)
self.assertEqual((mock_credentials, 'PROJECT_ID'), result)
@@ -399,7 +406,9 @@ class TestGoogleBaseHook(unittest.TestCase):
key_path=None,
keyfile_dict=service_account,
scopes=self.instance.scopes,
- delegate_to=None
+ delegate_to=None,
+ target_principal=None,
+ delegates=None,
)
self.assertEqual((mock_credentials, 'PROJECT_ID'), result)
@@ -417,7 +426,9 @@ class TestGoogleBaseHook(unittest.TestCase):
key_path=None,
keyfile_dict=None,
scopes=self.instance.scopes,
- delegate_to="USER"
+ delegate_to="USER",
+ target_principal=None,
+ delegates=None,
)
self.assertEqual((mock_credentials, "PROJECT_ID"), result)
@@ -451,7 +462,9 @@ class TestGoogleBaseHook(unittest.TestCase):
key_path=None,
keyfile_dict=None,
scopes=self.instance.scopes,
- delegate_to=None
+ delegate_to=None,
+ target_principal=None,
+ delegates=None,
)
self.assertEqual(("CREDENTIALS", 'SECOND_PROJECT_ID'), result)
@@ -632,6 +645,35 @@ class TestGoogleBaseHook(unittest.TestCase):
http_authorized = self.instance._authorize().http
self.assertNotEqual(http_authorized.timeout, None)
+ @parameterized.expand([
+ ('string', "ACCOUNT_1", "ACCOUNT_1", None),
+ ('single_element_list', ["ACCOUNT_1"], "ACCOUNT_1", []),
+ ('multiple_elements_list',
+ ["ACCOUNT_1", "ACCOUNT_2", "ACCOUNT_3"], "ACCOUNT_3", ["ACCOUNT_1",
"ACCOUNT_2"]),
+ ])
+ @mock.patch(MODULE_NAME + '.get_credentials_and_project_id')
+ def test_get_credentials_and_project_id_with_impersonation_chain(
+ self,
+ _,
+ impersonation_chain,
+ target_principal,
+ delegates,
+ mock_get_creds_and_proj_id,
+ ):
+ mock_credentials = mock.MagicMock()
+ mock_get_creds_and_proj_id.return_value = (mock_credentials,
PROJECT_ID)
+ self.instance.impersonation_chain = impersonation_chain
+ result = self.instance._get_credentials_and_project_id()
+ mock_get_creds_and_proj_id.assert_called_once_with(
+ key_path=None,
+ keyfile_dict=None,
+ scopes=self.instance.scopes,
+ delegate_to=None,
+ target_principal=target_principal,
+ delegates=delegates,
+ )
+ self.assertEqual((mock_credentials, PROJECT_ID), result)
+
class TestProvideAuthorizedGcloud(unittest.TestCase):
def setUp(self):