This is an automated email from the ASF dual-hosted git repository.

kamilbregula pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 5eacc16  Add support for impersonation in GCP hooks (#9915)
5eacc16 is described below

commit 5eacc164201a121cd06126aff613cbe0919d35cc
Author: Kamil Olszewski <[email protected]>
AuthorDate: Wed Jul 22 01:02:32 2020 +0200

    Add support for impersonation in GCP hooks (#9915)
    
    Co-authored-by: Kamil Olszewski <[email protected]>
---
 airflow/providers/google/cloud/hooks/automl.py     | 13 +++-
 airflow/providers/google/cloud/hooks/bigquery.py   |  6 +-
 .../providers/google/cloud/hooks/bigquery_dts.py   | 11 ++-
 airflow/providers/google/cloud/hooks/bigtable.py   | 15 +++-
 .../providers/google/cloud/hooks/cloud_build.py    | 25 ++++--
 .../google/cloud/hooks/cloud_memorystore.py        | 26 ++++++-
 airflow/providers/google/cloud/hooks/cloud_sql.py  | 13 +++-
 .../cloud/hooks/cloud_storage_transfer_service.py  | 13 +++-
 airflow/providers/google/cloud/hooks/compute.py    | 11 ++-
 .../providers/google/cloud/hooks/datacatalog.py    | 26 ++++++-
 airflow/providers/google/cloud/hooks/dataflow.py   | 11 ++-
 airflow/providers/google/cloud/hooks/datafusion.py |  9 ++-
 airflow/providers/google/cloud/hooks/datastore.py  | 11 ++-
 airflow/providers/google/cloud/hooks/dlp.py        | 26 ++++++-
 airflow/providers/google/cloud/hooks/functions.py  | 13 +++-
 airflow/providers/google/cloud/hooks/gcs.py        | 15 ++--
 airflow/providers/google/cloud/hooks/gdm.py        | 15 +++-
 airflow/providers/google/cloud/hooks/kms.py        | 24 ++++--
 .../google/cloud/hooks/kubernetes_engine.py        | 10 ++-
 .../providers/google/cloud/hooks/life_sciences.py  | 24 ++++--
 .../google/cloud/hooks/natural_language.py         | 26 ++++++-
 airflow/providers/google/cloud/hooks/pubsub.py     | 13 +++-
 .../providers/google/cloud/hooks/secret_manager.py | 24 ++++--
 airflow/providers/google/cloud/hooks/spanner.py    | 15 +++-
 .../providers/google/cloud/hooks/speech_to_text.py | 28 +++++--
 .../providers/google/cloud/hooks/stackdriver.py    | 15 +++-
 airflow/providers/google/cloud/hooks/tasks.py      | 26 ++++++-
 .../providers/google/cloud/hooks/text_to_speech.py | 28 +++++--
 airflow/providers/google/cloud/hooks/translate.py  | 15 +++-
 .../google/cloud/hooks/video_intelligence.py       | 26 ++++++-
 airflow/providers/google/cloud/hooks/vision.py     | 13 +++-
 .../google/cloud/utils/credentials_provider.py     | 55 +++++++++++++-
 .../providers/google/common/hooks/base_google.py   | 31 ++++++--
 .../providers/google/common/hooks/discovery_api.py | 26 +++++--
 .../providers/google/firebase/hooks/firestore.py   | 22 +++++-
 .../marketing_platform/hooks/campaign_manager.py   |  9 ++-
 .../marketing_platform/hooks/display_video.py      |  9 ++-
 .../google/marketing_platform/hooks/search_ads.py  |  9 ++-
 airflow/providers/google/suite/hooks/drive.py      | 24 ++++--
 airflow/providers/google/suite/hooks/sheets.py     | 24 ++++--
 .../providers/google/cloud/hooks/test_bigquery.py  |  6 +-
 .../providers/google/cloud/hooks/test_dataflow.py  |  7 +-
 .../providers/google/cloud/hooks/test_datastore.py |  7 +-
 tests/providers/google/cloud/hooks/test_gdm.py     |  7 +-
 tests/providers/google/cloud/hooks/test_kms.py     |  7 +-
 tests/providers/google/cloud/hooks/test_pubsub.py  |  7 +-
 .../providers/google/cloud/utils/base_gcp_mock.py  | 16 +++-
 .../cloud/utils/test_credentials_provider.py       | 88 +++++++++++++++++++++-
 .../google/common/hooks/test_base_google.py        | 52 +++++++++++--
 49 files changed, 791 insertions(+), 161 deletions(-)

diff --git a/airflow/providers/google/cloud/hooks/automl.py 
b/airflow/providers/google/cloud/hooks/automl.py
index 5c2ea89..a70b599 100644
--- a/airflow/providers/google/cloud/hooks/automl.py
+++ b/airflow/providers/google/cloud/hooks/automl.py
@@ -41,9 +41,16 @@ class CloudAutoMLHook(GoogleBaseHook):
     """
 
     def __init__(
-        self, gcp_conn_id: str = "google_cloud_default", delegate_to: 
Optional[str] = None
-    ):
-        super().__init__(gcp_conn_id, delegate_to)
+        self,
+        gcp_conn_id: str = "google_cloud_default",
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+    ) -> None:
+        super().__init__(
+            gcp_conn_id=gcp_conn_id,
+            delegate_to=delegate_to,
+            impersonation_chain=impersonation_chain,
+        )
         self._client = None  # type: Optional[AutoMlClient]
 
     @staticmethod
diff --git a/airflow/providers/google/cloud/hooks/bigquery.py 
b/airflow/providers/google/cloud/hooks/bigquery.py
index efdd6c3..cd186b8 100644
--- a/airflow/providers/google/cloud/hooks/bigquery.py
+++ b/airflow/providers/google/cloud/hooks/bigquery.py
@@ -61,6 +61,7 @@ class BigQueryHook(GoogleBaseHook, DbApiHook):
     def __init__(self,
                  gcp_conn_id: str = 'google_cloud_default',
                  delegate_to: Optional[str] = None,
+                 impersonation_chain: Optional[Union[str, Sequence[str]]] = 
None,
                  use_legacy_sql: bool = True,
                  location: Optional[str] = None,
                  bigquery_conn_id: Optional[str] = None,
@@ -73,7 +74,10 @@ class BigQueryHook(GoogleBaseHook, DbApiHook):
                 "the gcp_conn_id parameter.", DeprecationWarning, stacklevel=2)
             gcp_conn_id = bigquery_conn_id
         super().__init__(
-            gcp_conn_id=gcp_conn_id, delegate_to=delegate_to)
+            gcp_conn_id=gcp_conn_id,
+            delegate_to=delegate_to,
+            impersonation_chain=impersonation_chain,
+        )
         self.use_legacy_sql = use_legacy_sql
         self.location = location
         self.running_job_id = None  # type: Optional[str]
diff --git a/airflow/providers/google/cloud/hooks/bigquery_dts.py 
b/airflow/providers/google/cloud/hooks/bigquery_dts.py
index bd39bb5..efea859 100644
--- a/airflow/providers/google/cloud/hooks/bigquery_dts.py
+++ b/airflow/providers/google/cloud/hooks/bigquery_dts.py
@@ -51,9 +51,16 @@ class BiqQueryDataTransferServiceHook(GoogleBaseHook):
     _conn = None  # type: Optional[Resource]
 
     def __init__(
-        self, gcp_conn_id: str = "google_cloud_default", delegate_to: 
Optional[str] = None
+        self,
+        gcp_conn_id: str = "google_cloud_default",
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
     ) -> None:
-        super().__init__(gcp_conn_id=gcp_conn_id, delegate_to=delegate_to)
+        super().__init__(
+            gcp_conn_id=gcp_conn_id,
+            delegate_to=delegate_to,
+            impersonation_chain=impersonation_chain,
+        )
 
     @staticmethod
     def _disable_auto_scheduling(config: Union[dict, TransferConfig]) -> 
TransferConfig:
diff --git a/airflow/providers/google/cloud/hooks/bigtable.py 
b/airflow/providers/google/cloud/hooks/bigtable.py
index 49188a1..0627fcc 100644
--- a/airflow/providers/google/cloud/hooks/bigtable.py
+++ b/airflow/providers/google/cloud/hooks/bigtable.py
@@ -18,7 +18,7 @@
 """
 This module contains a Google Cloud Bigtable Hook.
 """
-from typing import Dict, List, Optional
+from typing import Dict, List, Optional, Sequence, Union
 
 from google.cloud.bigtable import Client
 from google.cloud.bigtable.cluster import Cluster
@@ -39,8 +39,17 @@ class BigtableHook(GoogleBaseHook):
     """
 
     # pylint: disable=too-many-arguments
-    def __init__(self, gcp_conn_id: str = 'google_cloud_default', delegate_to: 
Optional[str] = None) -> None:
-        super().__init__(gcp_conn_id, delegate_to)
+    def __init__(
+        self,
+        gcp_conn_id: str = "google_cloud_default",
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+    ) -> None:
+        super().__init__(
+            gcp_conn_id=gcp_conn_id,
+            delegate_to=delegate_to,
+            impersonation_chain=impersonation_chain,
+        )
         self._client = None
 
     def _get_client(self, project_id: str):
diff --git a/airflow/providers/google/cloud/hooks/cloud_build.py 
b/airflow/providers/google/cloud/hooks/cloud_build.py
index 85123f1..db9a3eb 100644
--- a/airflow/providers/google/cloud/hooks/cloud_build.py
+++ b/airflow/providers/google/cloud/hooks/cloud_build.py
@@ -18,7 +18,7 @@
 """Hook for Google Cloud Build service"""
 
 import time
-from typing import Any, Dict, Optional
+from typing import Any, Dict, Optional, Sequence, Union
 
 from googleapiclient.discovery import build
 
@@ -41,10 +41,19 @@ class CloudBuildHook(GoogleBaseHook):
     :type api_version: str
     :param gcp_conn_id: The connection ID to use when fetching connection info.
     :type gcp_conn_id: str
-    :param delegate_to: The account to impersonate, if any.
-        For this to work, the service account making the request must have
+    :param delegate_to: The account to impersonate using domain-wide 
delegation of authority,
+        if any. For this to work, the service account making the request must 
have
         domain-wide delegation enabled.
     :type delegate_to: str
+    :param impersonation_chain: Optional service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account.
+    :type impersonation_chain: Union[str, Sequence[str]]
     """
 
     _conn = None  # type: Optional[Any]
@@ -53,9 +62,15 @@ class CloudBuildHook(GoogleBaseHook):
         self,
         api_version: str = "v1",
         gcp_conn_id: str = "google_cloud_default",
-        delegate_to: Optional[str] = None
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
     ) -> None:
-        super().__init__(gcp_conn_id, delegate_to)
+        super().__init__(
+            gcp_conn_id=gcp_conn_id,
+            delegate_to=delegate_to,
+            impersonation_chain=impersonation_chain,
+        )
+
         self.api_version = api_version
 
     def get_conn(self):
diff --git a/airflow/providers/google/cloud/hooks/cloud_memorystore.py 
b/airflow/providers/google/cloud/hooks/cloud_memorystore.py
index 64ef45a..bb37e4a 100644
--- a/airflow/providers/google/cloud/hooks/cloud_memorystore.py
+++ b/airflow/providers/google/cloud/hooks/cloud_memorystore.py
@@ -41,14 +41,32 @@ class CloudMemorystoreHook(GoogleBaseHook):
 
     :param gcp_conn_id: The connection ID to use when fetching connection info.
     :type gcp_conn_id: str
-    :param delegate_to: The account to impersonate, if any.
-        For this to work, the service account making the request must have
+    :param delegate_to: The account to impersonate using domain-wide 
delegation of authority,
+        if any. For this to work, the service account making the request must 
have
         domain-wide delegation enabled.
     :type delegate_to: str
+    :param impersonation_chain: Optional service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account.
+    :type impersonation_chain: Union[str, Sequence[str]]
     """
 
-    def __init__(self, gcp_conn_id: str = "google_cloud_default", delegate_to: 
Optional[str] = None):
-        super().__init__(gcp_conn_id, delegate_to)
+    def __init__(
+        self,
+        gcp_conn_id: str = "google_cloud_default",
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+    ) -> None:
+        super().__init__(
+            gcp_conn_id=gcp_conn_id,
+            delegate_to=delegate_to,
+            impersonation_chain=impersonation_chain,
+        )
         self._client = None  # type: Optional[CloudRedisClient]
 
     def get_conn(self,):
diff --git a/airflow/providers/google/cloud/hooks/cloud_sql.py 
b/airflow/providers/google/cloud/hooks/cloud_sql.py
index 2da321e..d6ef47e 100644
--- a/airflow/providers/google/cloud/hooks/cloud_sql.py
+++ b/airflow/providers/google/cloud/hooks/cloud_sql.py
@@ -34,7 +34,7 @@ import subprocess
 import time
 import uuid
 from subprocess import PIPE, Popen
-from typing import Any, Dict, List, Optional, Union
+from typing import Any, Dict, List, Optional, Sequence, Union
 from urllib.parse import quote_plus
 
 import requests
@@ -80,10 +80,15 @@ class CloudSQLHook(GoogleBaseHook):
     def __init__(
         self,
         api_version: str,
-        gcp_conn_id: str = 'google_cloud_default',
-        delegate_to: Optional[str] = None
+        gcp_conn_id: str = "google_cloud_default",
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
     ) -> None:
-        super().__init__(gcp_conn_id, delegate_to)
+        super().__init__(
+            gcp_conn_id=gcp_conn_id,
+            delegate_to=delegate_to,
+            impersonation_chain=impersonation_chain,
+        )
         self.api_version = api_version
         self._conn = None
 
diff --git 
a/airflow/providers/google/cloud/hooks/cloud_storage_transfer_service.py 
b/airflow/providers/google/cloud/hooks/cloud_storage_transfer_service.py
index d901e5f..ec31ed2 100644
--- a/airflow/providers/google/cloud/hooks/cloud_storage_transfer_service.py
+++ b/airflow/providers/google/cloud/hooks/cloud_storage_transfer_service.py
@@ -25,7 +25,7 @@ import time
 import warnings
 from copy import deepcopy
 from datetime import timedelta
-from typing import Dict, List, Optional, Set, Union
+from typing import Dict, List, Optional, Sequence, Set, Union
 
 from googleapiclient.discovery import build
 from googleapiclient.errors import HttpError
@@ -129,10 +129,15 @@ class CloudDataTransferServiceHook(GoogleBaseHook):
     def __init__(
         self,
         api_version: str = 'v1',
-        gcp_conn_id: str = 'google_cloud_default',
-        delegate_to: Optional[str] = None
+        gcp_conn_id: str = "google_cloud_default",
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
     ) -> None:
-        super().__init__(gcp_conn_id, delegate_to)
+        super().__init__(
+            gcp_conn_id=gcp_conn_id,
+            delegate_to=delegate_to,
+            impersonation_chain=impersonation_chain,
+        )
         self.api_version = api_version
         self._conn = None
 
diff --git a/airflow/providers/google/cloud/hooks/compute.py 
b/airflow/providers/google/cloud/hooks/compute.py
index e3075fa..953caf6 100644
--- a/airflow/providers/google/cloud/hooks/compute.py
+++ b/airflow/providers/google/cloud/hooks/compute.py
@@ -20,7 +20,7 @@ This module contains a Google Compute Engine Hook.
 """
 
 import time
-from typing import Any, Dict, Optional
+from typing import Any, Dict, Optional, Sequence, Union
 
 from googleapiclient.discovery import build
 
@@ -54,9 +54,14 @@ class ComputeEngineHook(GoogleBaseHook):
         self,
         api_version: str = 'v1',
         gcp_conn_id: str = 'google_cloud_default',
-        delegate_to: Optional[str] = None
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
     ) -> None:
-        super().__init__(gcp_conn_id, delegate_to)
+        super().__init__(
+            gcp_conn_id=gcp_conn_id,
+            delegate_to=delegate_to,
+            impersonation_chain=impersonation_chain,
+        )
         self.api_version = api_version
 
     def get_conn(self):
diff --git a/airflow/providers/google/cloud/hooks/datacatalog.py 
b/airflow/providers/google/cloud/hooks/datacatalog.py
index 49b4c16..5c02118 100644
--- a/airflow/providers/google/cloud/hooks/datacatalog.py
+++ b/airflow/providers/google/cloud/hooks/datacatalog.py
@@ -33,14 +33,32 @@ class CloudDataCatalogHook(GoogleBaseHook):
 
     :param gcp_conn_id: The connection ID to use when fetching connection info.
     :type gcp_conn_id: str
-    :param delegate_to: The account to impersonate, if any.
-        For this to work, the service account making the request must have
+    :param delegate_to: The account to impersonate using domain-wide 
delegation of authority,
+        if any. For this to work, the service account making the request must 
have
         domain-wide delegation enabled.
     :type delegate_to: str
+    :param impersonation_chain: Optional service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account.
+    :type impersonation_chain: Union[str, Sequence[str]]
     """
 
-    def __init__(self, gcp_conn_id: str = "google_cloud_default", delegate_to: 
Optional[str] = None) -> None:
-        super().__init__(gcp_conn_id, delegate_to)
+    def __init__(
+        self,
+        gcp_conn_id: str = "google_cloud_default",
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+    ) -> None:
+        super().__init__(
+            gcp_conn_id=gcp_conn_id,
+            delegate_to=delegate_to,
+            impersonation_chain=impersonation_chain,
+        )
         self._client: Optional[DataCatalogClient] = None
 
     def get_conn(self) -> DataCatalogClient:
diff --git a/airflow/providers/google/cloud/hooks/dataflow.py 
b/airflow/providers/google/cloud/hooks/dataflow.py
index ee3b3f6..7b21c2c 100644
--- a/airflow/providers/google/cloud/hooks/dataflow.py
+++ b/airflow/providers/google/cloud/hooks/dataflow.py
@@ -29,7 +29,7 @@ import uuid
 import warnings
 from copy import deepcopy
 from tempfile import TemporaryDirectory
-from typing import Any, Callable, Dict, List, Optional, TypeVar, cast
+from typing import Any, Callable, Dict, List, Optional, Sequence, TypeVar, 
Union, cast
 
 from googleapiclient.discovery import build
 
@@ -413,12 +413,17 @@ class DataflowHook(GoogleBaseHook):
 
     def __init__(
         self,
-        gcp_conn_id: str = 'google_cloud_default',
+        gcp_conn_id: str = "google_cloud_default",
         delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
         poll_sleep: int = 10
     ) -> None:
         self.poll_sleep = poll_sleep
-        super().__init__(gcp_conn_id, delegate_to)
+        super().__init__(
+            gcp_conn_id=gcp_conn_id,
+            delegate_to=delegate_to,
+            impersonation_chain=impersonation_chain,
+        )
 
     def get_conn(self):
         """
diff --git a/airflow/providers/google/cloud/hooks/datafusion.py 
b/airflow/providers/google/cloud/hooks/datafusion.py
index 4015d4a..50d552c 100644
--- a/airflow/providers/google/cloud/hooks/datafusion.py
+++ b/airflow/providers/google/cloud/hooks/datafusion.py
@@ -21,7 +21,7 @@ This module contains Google DataFusion hook.
 import json
 import os
 from time import monotonic, sleep
-from typing import Any, Dict, List, Optional, Union
+from typing import Any, Dict, List, Optional, Sequence, Union
 from urllib.parse import quote, urlencode
 
 import google.auth
@@ -64,8 +64,13 @@ class DataFusionHook(GoogleBaseHook):
         api_version: str = "v1beta1",
         gcp_conn_id: str = "google_cloud_default",
         delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
     ) -> None:
-        super().__init__(gcp_conn_id, delegate_to)
+        super().__init__(
+            gcp_conn_id=gcp_conn_id,
+            delegate_to=delegate_to,
+            impersonation_chain=impersonation_chain,
+        )
         self.api_version = api_version
 
     def wait_for_operation(self, operation: Dict[str, Any]) -> Dict[str, Any]:
diff --git a/airflow/providers/google/cloud/hooks/datastore.py 
b/airflow/providers/google/cloud/hooks/datastore.py
index 7da30af..0dcf7f3 100644
--- a/airflow/providers/google/cloud/hooks/datastore.py
+++ b/airflow/providers/google/cloud/hooks/datastore.py
@@ -22,7 +22,7 @@ This module contains Google Datastore hook.
 
 import time
 import warnings
-from typing import Any, Dict, List, Optional, Union
+from typing import Any, Dict, List, Optional, Sequence, Union
 
 from googleapiclient.discovery import build
 
@@ -42,8 +42,9 @@ class DatastoreHook(GoogleBaseHook):
 
     def __init__(
         self,
-        gcp_conn_id: str = 'google_cloud_default',
+        gcp_conn_id: str = "google_cloud_default",
         delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
         api_version: str = 'v1',
         datastore_conn_id: Optional[str] = None
     ) -> None:
@@ -52,7 +53,11 @@ class DatastoreHook(GoogleBaseHook):
                 "The datastore_conn_id parameter has been deprecated. You 
should pass "
                 "the gcp_conn_id parameter.", DeprecationWarning, stacklevel=2)
             gcp_conn_id = datastore_conn_id
-        super().__init__(gcp_conn_id=gcp_conn_id, delegate_to=delegate_to)
+        super().__init__(
+            gcp_conn_id=gcp_conn_id,
+            delegate_to=delegate_to,
+            impersonation_chain=impersonation_chain,
+        )
         self.connection = None
         self.api_version = api_version
 
diff --git a/airflow/providers/google/cloud/hooks/dlp.py 
b/airflow/providers/google/cloud/hooks/dlp.py
index 79d2023..a8dcdce 100644
--- a/airflow/providers/google/cloud/hooks/dlp.py
+++ b/airflow/providers/google/cloud/hooks/dlp.py
@@ -52,14 +52,32 @@ class CloudDLPHook(GoogleBaseHook):
 
     :param gcp_conn_id: The connection ID to use when fetching connection info.
     :type gcp_conn_id: str
-    :param delegate_to: The account to impersonate, if any.
-        For this to work, the service account making the request must have
+    :param delegate_to: The account to impersonate using domain-wide 
delegation of authority,
+        if any. For this to work, the service account making the request must 
have
         domain-wide delegation enabled.
     :type delegate_to: str
+    :param impersonation_chain: Optional service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account.
+    :type impersonation_chain: Union[str, Sequence[str]]
     """
 
-    def __init__(self, gcp_conn_id: str = "google_cloud_default", delegate_to: 
Optional[str] = None) -> None:
-        super().__init__(gcp_conn_id, delegate_to)
+    def __init__(
+        self,
+        gcp_conn_id: str = "google_cloud_default",
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+    ) -> None:
+        super().__init__(
+            gcp_conn_id=gcp_conn_id,
+            delegate_to=delegate_to,
+            impersonation_chain=impersonation_chain,
+        )
         self._client = None
 
     def get_conn(self) -> DlpServiceClient:
diff --git a/airflow/providers/google/cloud/hooks/functions.py 
b/airflow/providers/google/cloud/hooks/functions.py
index 9d7439d..84328f0 100644
--- a/airflow/providers/google/cloud/hooks/functions.py
+++ b/airflow/providers/google/cloud/hooks/functions.py
@@ -19,7 +19,7 @@
 This module contains a Google Cloud Functions Hook.
 """
 import time
-from typing import Any, Dict, List, Optional
+from typing import Any, Dict, List, Optional, Sequence, Union
 
 import requests
 from googleapiclient.discovery import build
@@ -44,10 +44,15 @@ class CloudFunctionsHook(GoogleBaseHook):
     def __init__(
         self,
         api_version: str,
-        gcp_conn_id: str = 'google_cloud_default',
-        delegate_to: Optional[str] = None
+        gcp_conn_id: str = "google_cloud_default",
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
     ) -> None:
-        super().__init__(gcp_conn_id, delegate_to)
+        super().__init__(
+            gcp_conn_id=gcp_conn_id,
+            delegate_to=delegate_to,
+            impersonation_chain=impersonation_chain,
+        )
         self.api_version = api_version
 
     @staticmethod
diff --git a/airflow/providers/google/cloud/hooks/gcs.py 
b/airflow/providers/google/cloud/hooks/gcs.py
index 94523ff..949937b 100644
--- a/airflow/providers/google/cloud/hooks/gcs.py
+++ b/airflow/providers/google/cloud/hooks/gcs.py
@@ -28,7 +28,7 @@ from contextlib import contextmanager
 from io import BytesIO
 from os import path
 from tempfile import NamedTemporaryFile
-from typing import Callable, Optional, Set, Tuple, TypeVar, Union, cast
+from typing import Callable, Optional, Sequence, Set, Tuple, TypeVar, Union, 
cast
 from urllib.parse import urlparse
 
 from google.api_core.exceptions import NotFound
@@ -114,9 +114,10 @@ class GCSHook(GoogleBaseHook):
 
     def __init__(
         self,
-            gcp_conn_id: str = 'google_cloud_default',
-            delegate_to: Optional[str] = None,
-            google_cloud_storage_conn_id: Optional[str] = None
+        gcp_conn_id: str = "google_cloud_default",
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        google_cloud_storage_conn_id: Optional[str] = None
     ) -> None:
         # To preserve backward compatibility
         # TODO: remove one day
@@ -125,7 +126,11 @@ class GCSHook(GoogleBaseHook):
                 "The google_cloud_storage_conn_id parameter has been 
deprecated. You should pass "
                 "the gcp_conn_id parameter.", DeprecationWarning, stacklevel=2)
             gcp_conn_id = google_cloud_storage_conn_id
-        super().__init__(gcp_conn_id=gcp_conn_id, delegate_to=delegate_to)
+        super().__init__(
+            gcp_conn_id=gcp_conn_id,
+            delegate_to=delegate_to,
+            impersonation_chain=impersonation_chain,
+        )
 
     def get_conn(self):
         """
diff --git a/airflow/providers/google/cloud/hooks/gdm.py 
b/airflow/providers/google/cloud/hooks/gdm.py
index 6f5af2f..60cebbf 100644
--- a/airflow/providers/google/cloud/hooks/gdm.py
+++ b/airflow/providers/google/cloud/hooks/gdm.py
@@ -17,7 +17,7 @@
 # under the License.
 #
 
-from typing import Any, Dict, List, Optional
+from typing import Any, Dict, List, Optional, Sequence, Union
 
 from googleapiclient.discovery import build
 
@@ -31,8 +31,17 @@ class GoogleDeploymentManagerHook(GoogleBaseHook):  # 
pylint: disable=abstract-m
     This allows for scheduled and programatic inspection and deletion fo 
resources managed by GDM.
     """
 
-    def __init__(self, gcp_conn_id='google_cloud_default', delegate_to=None):
-        super(GoogleDeploymentManagerHook, self).__init__(gcp_conn_id, 
delegate_to=delegate_to)
+    def __init__(
+        self,
+        gcp_conn_id: str = "google_cloud_default",
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+    ) -> None:
+        super(GoogleDeploymentManagerHook, self).__init__(
+            gcp_conn_id=gcp_conn_id,
+            delegate_to=delegate_to,
+            impersonation_chain=impersonation_chain,
+        )
 
     def get_conn(self):
         """
diff --git a/airflow/providers/google/cloud/hooks/kms.py 
b/airflow/providers/google/cloud/hooks/kms.py
index 3309c46..419f3fd 100644
--- a/airflow/providers/google/cloud/hooks/kms.py
+++ b/airflow/providers/google/cloud/hooks/kms.py
@@ -22,7 +22,7 @@ This module contains a Google Cloud KMS hook.
 
 
 import base64
-from typing import Optional, Sequence, Tuple
+from typing import Optional, Sequence, Tuple, Union
 
 from google.api_core.retry import Retry
 from google.cloud.kms_v1 import KeyManagementServiceClient
@@ -47,18 +47,32 @@ class CloudKMSHook(GoogleBaseHook):
 
     :param gcp_conn_id: The connection ID to use when fetching connection info.
     :type gcp_conn_id: str
-    :param delegate_to: The account to impersonate, if any.
-        For this to work, the service account making the request must have
+    :param delegate_to: The account to impersonate using domain-wide 
delegation of authority,
+        if any. For this to work, the service account making the request must 
have
         domain-wide delegation enabled.
     :type delegate_to: str
+    :param impersonation_chain: Optional service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account.
+    :type impersonation_chain: Union[str, Sequence[str]]
     """
 
     def __init__(
         self,
         gcp_conn_id: str = "google_cloud_default",
-        delegate_to: Optional[str] = None
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
     ) -> None:
-        super().__init__(gcp_conn_id=gcp_conn_id, delegate_to=delegate_to)
+        super().__init__(
+            gcp_conn_id=gcp_conn_id,
+            delegate_to=delegate_to,
+            impersonation_chain=impersonation_chain,
+        )
         self._conn = None  # type: Optional[KeyManagementServiceClient]
 
     def get_conn(self) -> KeyManagementServiceClient:
diff --git a/airflow/providers/google/cloud/hooks/kubernetes_engine.py 
b/airflow/providers/google/cloud/hooks/kubernetes_engine.py
index e2a3c14..d391a83 100644
--- a/airflow/providers/google/cloud/hooks/kubernetes_engine.py
+++ b/airflow/providers/google/cloud/hooks/kubernetes_engine.py
@@ -22,7 +22,7 @@ This module contains a Google Kubernetes Engine Hook.
 
 import time
 import warnings
-from typing import Dict, Optional, Union
+from typing import Dict, Optional, Sequence, Union
 
 from google.api_core.exceptions import AlreadyExists, NotFound
 from google.api_core.gapic_v1.method import DEFAULT
@@ -49,12 +49,16 @@ class GKEHook(GoogleBaseHook):
 
     def __init__(
         self,
-        gcp_conn_id: str = 'google_cloud_default',
+        gcp_conn_id: str = "google_cloud_default",
         delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
         location: Optional[str] = None
     ) -> None:
         super().__init__(
-            gcp_conn_id=gcp_conn_id, delegate_to=delegate_to)
+            gcp_conn_id=gcp_conn_id,
+            delegate_to=delegate_to,
+            impersonation_chain=impersonation_chain,
+        )
         self._client = None
         self.location = location
 
diff --git a/airflow/providers/google/cloud/hooks/life_sciences.py 
b/airflow/providers/google/cloud/hooks/life_sciences.py
index 8c1e516..11631de 100644
--- a/airflow/providers/google/cloud/hooks/life_sciences.py
+++ b/airflow/providers/google/cloud/hooks/life_sciences.py
@@ -18,7 +18,7 @@
 """Hook for Google Cloud Life Sciences service"""
 
 import time
-from typing import Any, Dict, Optional
+from typing import Any, Dict, Optional, Sequence, Union
 
 import google.api_core.path_template
 from googleapiclient.discovery import build
@@ -42,10 +42,19 @@ class LifeSciencesHook(GoogleBaseHook):
     :type api_version: str
     :param gcp_conn_id: The connection ID to use when fetching connection info.
     :type gcp_conn_id: str
-    :param delegate_to: The account to impersonate, if any.
-        For this to work, the service account making the request must have
+    :param delegate_to: The account to impersonate using domain-wide 
delegation of authority,
+        if any. For this to work, the service account making the request must 
have
         domain-wide delegation enabled.
     :type delegate_to: str
+    :param impersonation_chain: Optional service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account.
+    :type impersonation_chain: Union[str, Sequence[str]]
     """
 
     _conn = None  # type: Optional[Any]
@@ -54,9 +63,14 @@ class LifeSciencesHook(GoogleBaseHook):
         self,
         api_version: str = "v2beta",
         gcp_conn_id: str = "google_cloud_default",
-        delegate_to: Optional[str] = None
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
     ) -> None:
-        super().__init__(gcp_conn_id, delegate_to)
+        super().__init__(
+            gcp_conn_id=gcp_conn_id,
+            delegate_to=delegate_to,
+            impersonation_chain=impersonation_chain,
+        )
         self.api_version = api_version
 
     def get_conn(self):
diff --git a/airflow/providers/google/cloud/hooks/natural_language.py 
b/airflow/providers/google/cloud/hooks/natural_language.py
index 326c4bf..b0b8b20 100644
--- a/airflow/providers/google/cloud/hooks/natural_language.py
+++ b/airflow/providers/google/cloud/hooks/natural_language.py
@@ -37,14 +37,32 @@ class CloudNaturalLanguageHook(GoogleBaseHook):
 
     :param gcp_conn_id: The connection ID to use when fetching connection info.
     :type gcp_conn_id: str
-    :param delegate_to: The account to impersonate, if any.
-        For this to work, the service account making the request must have
+    :param delegate_to: The account to impersonate using domain-wide 
delegation of authority,
+        if any. For this to work, the service account making the request must 
have
         domain-wide delegation enabled.
     :type delegate_to: str
+    :param impersonation_chain: Optional service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account.
+    :type impersonation_chain: Union[str, Sequence[str]]
     """
 
-    def __init__(self, gcp_conn_id: str = "google_cloud_default", delegate_to: 
Optional[str] = None) -> None:
-        super().__init__(gcp_conn_id, delegate_to)
+    def __init__(
+        self,
+        gcp_conn_id: str = "google_cloud_default",
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+    ) -> None:
+        super().__init__(
+            gcp_conn_id=gcp_conn_id,
+            delegate_to=delegate_to,
+            impersonation_chain=impersonation_chain,
+        )
         self._conn = None
 
     def get_conn(self) -> LanguageServiceClient:
diff --git a/airflow/providers/google/cloud/hooks/pubsub.py 
b/airflow/providers/google/cloud/hooks/pubsub.py
index 7c8d6d3..026a116 100644
--- a/airflow/providers/google/cloud/hooks/pubsub.py
+++ b/airflow/providers/google/cloud/hooks/pubsub.py
@@ -50,8 +50,17 @@ class PubSubHook(GoogleBaseHook):
     the project embedded in the Connection referenced by gcp_conn_id.
     """
 
-    def __init__(self, gcp_conn_id: str = 'google_cloud_default', delegate_to: 
Optional[str] = None) -> None:
-        super().__init__(gcp_conn_id, delegate_to=delegate_to)
+    def __init__(
+        self,
+        gcp_conn_id: str = "google_cloud_default",
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+    ) -> None:
+        super().__init__(
+            gcp_conn_id=gcp_conn_id,
+            delegate_to=delegate_to,
+            impersonation_chain=impersonation_chain,
+        )
         self._client = None
 
     def get_conn(self) -> PublisherClient:
diff --git a/airflow/providers/google/cloud/hooks/secret_manager.py 
b/airflow/providers/google/cloud/hooks/secret_manager.py
index 3471dd4..a58cf62 100644
--- a/airflow/providers/google/cloud/hooks/secret_manager.py
+++ b/airflow/providers/google/cloud/hooks/secret_manager.py
@@ -16,7 +16,7 @@
 # specific language governing permissions and limitations
 # under the License.
 """Hook for Secrets Manager service"""
-from typing import Optional
+from typing import Optional, Sequence, Union
 
 from airflow.providers.google.cloud._internal_client.secret_manager_client 
import _SecretManagerClient  # noqa
 from airflow.providers.google.common.hooks.base_google import GoogleBaseHook
@@ -34,17 +34,31 @@ class SecretsManagerHook(GoogleBaseHook):
 
     :param gcp_conn_id: The connection ID to use when fetching connection info.
     :type gcp_conn_id: str
-    :param delegate_to: The account to impersonate, if any.
-        For this to work, the service account making the request must have
+    :param delegate_to: The account to impersonate using domain-wide 
delegation of authority,
+        if any. For this to work, the service account making the request must 
have
         domain-wide delegation enabled.
     :type delegate_to: str
+    :param impersonation_chain: Optional service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account.
+    :type impersonation_chain: Union[str, Sequence[str]]
     """
     def __init__(
         self,
         gcp_conn_id: str = "google_cloud_default",
-        delegate_to: Optional[str] = None
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
     ) -> None:
-        super().__init__(gcp_conn_id, delegate_to)
+        super().__init__(
+            gcp_conn_id=gcp_conn_id,
+            delegate_to=delegate_to,
+            impersonation_chain=impersonation_chain,
+        )
         self.client = _SecretManagerClient(credentials=self._get_credentials())
 
     def get_conn(self) -> _SecretManagerClient:
diff --git a/airflow/providers/google/cloud/hooks/spanner.py 
b/airflow/providers/google/cloud/hooks/spanner.py
index 0a03d19..bba2c04 100644
--- a/airflow/providers/google/cloud/hooks/spanner.py
+++ b/airflow/providers/google/cloud/hooks/spanner.py
@@ -18,7 +18,7 @@
 """
 This module contains a Google Cloud Spanner Hook.
 """
-from typing import Callable, List, Optional
+from typing import Callable, List, Optional, Sequence, Union
 
 from google.api_core.exceptions import AlreadyExists, GoogleAPICallError
 from google.cloud.spanner_v1.client import Client
@@ -39,8 +39,17 @@ class SpannerHook(GoogleBaseHook):
     keyword arguments rather than positional.
     """
 
-    def __init__(self, gcp_conn_id: str = 'google_cloud_default', delegate_to: 
Optional[str] = None) -> None:
-        super().__init__(gcp_conn_id, delegate_to)
+    def __init__(
+        self,
+        gcp_conn_id: str = "google_cloud_default",
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+    ) -> None:
+        super().__init__(
+            gcp_conn_id=gcp_conn_id,
+            delegate_to=delegate_to,
+            impersonation_chain=impersonation_chain,
+        )
         self._client = None
 
     def _get_client(self, project_id: str) -> Client:
diff --git a/airflow/providers/google/cloud/hooks/speech_to_text.py 
b/airflow/providers/google/cloud/hooks/speech_to_text.py
index 5cf6929..6cb943c 100644
--- a/airflow/providers/google/cloud/hooks/speech_to_text.py
+++ b/airflow/providers/google/cloud/hooks/speech_to_text.py
@@ -18,7 +18,7 @@
 """
 This module contains a Google Cloud Speech Hook.
 """
-from typing import Dict, Optional, Union
+from typing import Dict, Optional, Sequence, Union
 
 from google.api_core.retry import Retry
 from google.cloud.speech_v1 import SpeechClient
@@ -33,14 +33,32 @@ class CloudSpeechToTextHook(GoogleBaseHook):
 
     :param gcp_conn_id: The connection ID to use when fetching connection info.
     :type gcp_conn_id: str
-    :param delegate_to: The account to impersonate, if any.
-        For this to work, the service account making the request must have
+    :param delegate_to: The account to impersonate using domain-wide 
delegation of authority,
+        if any. For this to work, the service account making the request must 
have
         domain-wide delegation enabled.
     :type delegate_to: str
+    :param impersonation_chain: Optional service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account.
+    :type impersonation_chain: Union[str, Sequence[str]]
     """
 
-    def __init__(self, gcp_conn_id: str = "google_cloud_default", delegate_to: 
Optional[str] = None) -> None:
-        super().__init__(gcp_conn_id, delegate_to)
+    def __init__(
+        self,
+        gcp_conn_id: str = "google_cloud_default",
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+    ) -> None:
+        super().__init__(
+            gcp_conn_id=gcp_conn_id,
+            delegate_to=delegate_to,
+            impersonation_chain=impersonation_chain,
+        )
         self._client = None
 
     def get_conn(self) -> SpeechClient:
diff --git a/airflow/providers/google/cloud/hooks/stackdriver.py 
b/airflow/providers/google/cloud/hooks/stackdriver.py
index e0ebd7b..2aec897 100644
--- a/airflow/providers/google/cloud/hooks/stackdriver.py
+++ b/airflow/providers/google/cloud/hooks/stackdriver.py
@@ -21,7 +21,7 @@ This module contains GCP Stackdriver operators.
 """
 
 import json
-from typing import Any, Dict, Optional
+from typing import Any, Dict, Optional, Sequence, Union
 
 from google.api_core.exceptions import InvalidArgument
 from google.api_core.gapic_v1.method import DEFAULT
@@ -38,8 +38,17 @@ class StackdriverHook(GoogleBaseHook):
     Stackdriver Hook for connecting with GCP Stackdriver
     """
 
-    def __init__(self, gcp_conn_id='google_cloud_default', delegate_to=None):
-        super().__init__(gcp_conn_id, delegate_to)
+    def __init__(
+        self,
+        gcp_conn_id: str = "google_cloud_default",
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+    ) -> None:
+        super().__init__(
+            gcp_conn_id=gcp_conn_id,
+            delegate_to=delegate_to,
+            impersonation_chain=impersonation_chain,
+        )
         self._policy_client = None
         self._channel_client = None
 
diff --git a/airflow/providers/google/cloud/hooks/tasks.py 
b/airflow/providers/google/cloud/hooks/tasks.py
index f28b58a..e324012 100644
--- a/airflow/providers/google/cloud/hooks/tasks.py
+++ b/airflow/providers/google/cloud/hooks/tasks.py
@@ -41,14 +41,32 @@ class CloudTasksHook(GoogleBaseHook):
 
     :param gcp_conn_id: The connection ID to use when fetching connection info.
     :type gcp_conn_id: str
-    :param delegate_to: The account to impersonate, if any.
-        For this to work, the service account making the request must have
+    :param delegate_to: The account to impersonate using domain-wide 
delegation of authority,
+        if any. For this to work, the service account making the request must 
have
         domain-wide delegation enabled.
     :type delegate_to: str
+    :param impersonation_chain: Optional service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account.
+    :type impersonation_chain: Union[str, Sequence[str]]
     """
 
-    def __init__(self, gcp_conn_id="google_cloud_default", delegate_to=None):
-        super().__init__(gcp_conn_id, delegate_to)
+    def __init__(
+        self,
+        gcp_conn_id: str = "google_cloud_default",
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+    ) -> None:
+        super().__init__(
+            gcp_conn_id=gcp_conn_id,
+            delegate_to=delegate_to,
+            impersonation_chain=impersonation_chain,
+        )
         self._client = None
 
     def get_conn(self):
diff --git a/airflow/providers/google/cloud/hooks/text_to_speech.py 
b/airflow/providers/google/cloud/hooks/text_to_speech.py
index 52f3df7..7e16cd3 100644
--- a/airflow/providers/google/cloud/hooks/text_to_speech.py
+++ b/airflow/providers/google/cloud/hooks/text_to_speech.py
@@ -18,7 +18,7 @@
 """
 This module contains a Google Cloud Text to Speech Hook.
 """
-from typing import Dict, Optional, Union
+from typing import Dict, Optional, Sequence, Union
 
 from google.api_core.retry import Retry
 from google.cloud.texttospeech_v1 import TextToSpeechClient
@@ -38,14 +38,32 @@ class CloudTextToSpeechHook(GoogleBaseHook):
 
     :param gcp_conn_id: The connection ID to use when fetching connection info.
     :type gcp_conn_id: str
-    :param delegate_to: The account to impersonate, if any.
-        For this to work, the service account making the request must have
+    :param delegate_to: The account to impersonate using domain-wide 
delegation of authority,
+        if any. For this to work, the service account making the request must 
have
         domain-wide delegation enabled.
     :type delegate_to: str
+    :param impersonation_chain: Optional service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account.
+    :type impersonation_chain: Union[str, Sequence[str]]
     """
 
-    def __init__(self, gcp_conn_id: str = "google_cloud_default", delegate_to: 
Optional[str] = None) -> None:
-        super().__init__(gcp_conn_id, delegate_to)
+    def __init__(
+        self,
+        gcp_conn_id: str = "google_cloud_default",
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+    ) -> None:
+        super().__init__(
+            gcp_conn_id=gcp_conn_id,
+            delegate_to=delegate_to,
+            impersonation_chain=impersonation_chain,
+        )
         self._client = None  # type: Optional[TextToSpeechClient]
 
     def get_conn(self) -> TextToSpeechClient:
diff --git a/airflow/providers/google/cloud/hooks/translate.py 
b/airflow/providers/google/cloud/hooks/translate.py
index c6ee3e7..c3fd653 100644
--- a/airflow/providers/google/cloud/hooks/translate.py
+++ b/airflow/providers/google/cloud/hooks/translate.py
@@ -18,7 +18,7 @@
 """
 This module contains a Google Cloud Translate Hook.
 """
-from typing import Dict, List, Optional, Union
+from typing import Dict, List, Optional, Sequence, Union
 
 from google.cloud.translate_v2 import Client
 
@@ -33,8 +33,17 @@ class CloudTranslateHook(GoogleBaseHook):
     keyword arguments rather than positional.
     """
 
-    def __init__(self, gcp_conn_id: str = 'google_cloud_default') -> None:
-        super().__init__(gcp_conn_id)
+    def __init__(
+        self,
+        gcp_conn_id: str = "google_cloud_default",
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+    ) -> None:
+        super().__init__(
+            gcp_conn_id=gcp_conn_id,
+            delegate_to=delegate_to,
+            impersonation_chain=impersonation_chain,
+        )
         self._client = None  # type: Optional[Client]
 
     def get_conn(self) -> Client:
diff --git a/airflow/providers/google/cloud/hooks/video_intelligence.py 
b/airflow/providers/google/cloud/hooks/video_intelligence.py
index 215a29a..a1f1c3f 100644
--- a/airflow/providers/google/cloud/hooks/video_intelligence.py
+++ b/airflow/providers/google/cloud/hooks/video_intelligence.py
@@ -37,14 +37,32 @@ class CloudVideoIntelligenceHook(GoogleBaseHook):
 
     :param gcp_conn_id: The connection ID to use when fetching connection info.
     :type gcp_conn_id: str
-    :param delegate_to: The account to impersonate, if any.
-        For this to work, the service account making the request must have
+    :param delegate_to: The account to impersonate using domain-wide 
delegation of authority,
+        if any. For this to work, the service account making the request must 
have
         domain-wide delegation enabled.
     :type delegate_to: str
+    :param impersonation_chain: Optional service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account.
+    :type impersonation_chain: Union[str, Sequence[str]]
     """
 
-    def __init__(self, gcp_conn_id: str = "google_cloud_default", delegate_to: 
Optional[str] = None) -> None:
-        super().__init__(gcp_conn_id, delegate_to)
+    def __init__(
+        self,
+        gcp_conn_id: str = "google_cloud_default",
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+    ) -> None:
+        super().__init__(
+            gcp_conn_id=gcp_conn_id,
+            delegate_to=delegate_to,
+            impersonation_chain=impersonation_chain,
+        )
         self._conn = None
 
     def get_conn(self) -> VideoIntelligenceServiceClient:
diff --git a/airflow/providers/google/cloud/hooks/vision.py 
b/airflow/providers/google/cloud/hooks/vision.py
index b858a92..84c3df5 100644
--- a/airflow/providers/google/cloud/hooks/vision.py
+++ b/airflow/providers/google/cloud/hooks/vision.py
@@ -129,8 +129,17 @@ class CloudVisionHook(GoogleBaseHook):
         'ProductSet', 'productset_id', ProductSearchClient.product_set_path
     )
 
-    def __init__(self, gcp_conn_id: str = 'google_cloud_default', delegate_to: 
Optional[str] = None) -> None:
-        super().__init__(gcp_conn_id, delegate_to)
+    def __init__(
+        self,
+        gcp_conn_id: str = "google_cloud_default",
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+    ) -> None:
+        super().__init__(
+            gcp_conn_id=gcp_conn_id,
+            delegate_to=delegate_to,
+            impersonation_chain=impersonation_chain,
+        )
         self._client = None
 
     def get_conn(self) -> ProductSearchClient:
diff --git a/airflow/providers/google/cloud/utils/credentials_provider.py 
b/airflow/providers/google/cloud/utils/credentials_provider.py
index 76234f8..93cb9e9 100644
--- a/airflow/providers/google/cloud/utils/credentials_provider.py
+++ b/airflow/providers/google/cloud/utils/credentials_provider.py
@@ -23,12 +23,13 @@ import json
 import logging
 import tempfile
 from contextlib import ExitStack, contextmanager
-from typing import Collection, Dict, Optional, Sequence, Tuple
+from typing import Collection, Dict, Optional, Sequence, Tuple, Union
 from urllib.parse import urlencode
 
 import google.auth
 import google.auth.credentials
 import google.oauth2.service_account
+from google.auth import impersonated_credentials
 from google.auth.environment_vars import CREDENTIALS, LEGACY_PROJECT, PROJECT
 
 from airflow.exceptions import AirflowException
@@ -191,12 +192,22 @@ class _CredentialProvider(LoggingMixin):
     :type keyfile_dict: Dict[str, str]
     :param scopes:  OAuth scopes for the connection
     :type scopes: Collection[str]
-    :param delegate_to: The account to impersonate, if any.
-        For this to work, the service account making the request must have
+    :param delegate_to: The account to impersonate using domain-wide 
delegation of authority,
+        if any. For this to work, the service account making the request must 
have
         domain-wide delegation enabled.
     :type delegate_to: str
     :param disable_logging: If true, disable all log messages, which allows 
you to use this
         class to configure Logger.
+    :param target_principal: The service account to directly impersonate using 
short-term
+        credentials, if any. For this to work, the target_principal account 
must grant
+        the originating account the Service Account Token Creator IAM role.
+    :type target_principal: str
+    :param delegates: optional chained list of accounts required to get the 
access_token of
+        target_principal. If set, the sequence of identities from the list 
must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account 
and target_principal
+        granting the role to the last account from the list.
+    :type delegates: Sequence[str]
     """
     def __init__(
         self,
@@ -205,7 +216,9 @@ class _CredentialProvider(LoggingMixin):
         # See: https://github.com/PyCQA/pylint/issues/2377
         scopes: Optional[Collection[str]] = None,  # pylint: 
disable=unsubscriptable-object
         delegate_to: Optional[str] = None,
-        disable_logging: bool = False
+        disable_logging: bool = False,
+        target_principal: Optional[str] = None,
+        delegates: Optional[Sequence[str]] = None,
     ):
         super().__init__()
         if key_path and keyfile_dict:
@@ -218,6 +231,8 @@ class _CredentialProvider(LoggingMixin):
         self.scopes = scopes
         self.delegate_to = delegate_to
         self.disable_logging = disable_logging
+        self.target_principal = target_principal
+        self.delegates = delegates
 
     def get_credentials_and_project(self):
         """
@@ -243,6 +258,14 @@ class _CredentialProvider(LoggingMixin):
                     "Please use service-account for authorization."
                 )
 
+        if self.target_principal:
+            credentials = impersonated_credentials.Credentials(
+                source_credentials=credentials,
+                target_principal=self.target_principal,
+                delegates=self.delegates,
+                target_scopes=self.scopes
+            )
+
         return credentials, project_id
 
     def _get_credentials_using_keyfile_dict(self):
@@ -311,3 +334,27 @@ def _get_scopes(scopes: Optional[str] = None) -> 
Sequence[str]:
     """
     return [s.strip() for s in scopes.split(',')] \
         if scopes else _DEFAULT_SCOPES
+
+
+def _get_target_principal_and_delegates(
+    impersonation_chain: Optional[Union[str, Sequence[str]]] = None
+) -> Tuple[Optional[str], Optional[Sequence[str]]]:
+    """
+    Analyze contents of impersonation_chain and return target_principal (the 
service account
+    to directly impersonate using short-term credentials, if any) and optional 
list of delegates
+    required to get the access_token of target_principal.
+
+    :param impersonation_chain: the service account to impersonate or a 
chained list leading to this
+        account
+    :type impersonation_chain: Optional[Union[str, Sequence[str]]]
+
+    :return: Returns the tuple of target_principal and delegates
+    :rtype: Tuple[Optional[str], Optional[Sequence[str]]]
+    """
+    if not impersonation_chain:
+        return None, None
+
+    if isinstance(impersonation_chain, str):
+        return impersonation_chain, None
+
+    return impersonation_chain[-1], impersonation_chain[:-1]
diff --git a/airflow/providers/google/common/hooks/base_google.py 
b/airflow/providers/google/common/hooks/base_google.py
index 4e942a7..62e22e1 100644
--- a/airflow/providers/google/common/hooks/base_google.py
+++ b/airflow/providers/google/common/hooks/base_google.py
@@ -26,7 +26,7 @@ import os
 import tempfile
 from contextlib import contextmanager
 from subprocess import check_output
-from typing import Any, Callable, Dict, Optional, Sequence, Tuple, TypeVar, 
cast
+from typing import Any, Callable, Dict, Optional, Sequence, Tuple, TypeVar, 
Union, cast
 
 import google.auth
 import google.auth.credentials
@@ -44,7 +44,7 @@ from airflow import version
 from airflow.exceptions import AirflowException
 from airflow.hooks.base_hook import BaseHook
 from airflow.providers.google.cloud.utils.credentials_provider import (
-    _get_scopes, get_credentials_and_project_id,
+    _get_scopes, _get_target_principal_and_delegates, 
get_credentials_and_project_id,
 )
 from airflow.utils.process_utils import patch_environ
 
@@ -148,16 +148,31 @@ class GoogleBaseHook(BaseHook):
 
     :param gcp_conn_id: The connection ID to use when fetching connection info.
     :type gcp_conn_id: str
-    :param delegate_to: The account to impersonate, if any.
-        For this to work, the service account making the request must have
+    :param delegate_to: The account to impersonate using domain-wide 
delegation of authority,
+        if any. For this to work, the service account making the request must 
have
         domain-wide delegation enabled.
     :type delegate_to: str
+    :param impersonation_chain: Optional service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account.
+    :type impersonation_chain: Union[str, Sequence[str]]
     """
 
-    def __init__(self, gcp_conn_id: str = 'google_cloud_default', delegate_to: 
Optional[str] = None) -> None:
+    def __init__(
+        self,
+        gcp_conn_id: str = 'google_cloud_default',
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+    ) -> None:
         super().__init__()
         self.gcp_conn_id = gcp_conn_id
         self.delegate_to = delegate_to
+        self.impersonation_chain = impersonation_chain
         self.extras = self.get_connection(self.gcp_conn_id).extra_dejson  # 
type: Dict
         self._cached_credentials: 
Optional[google.auth.credentials.Credentials] = None
         self._cached_project_id: Optional[str] = None
@@ -178,11 +193,15 @@ class GoogleBaseHook(BaseHook):
         except json.decoder.JSONDecodeError:
             raise AirflowException('Invalid key JSON.')
 
+        target_principal, delegates = 
_get_target_principal_and_delegates(self.impersonation_chain)
+
         credentials, project_id = get_credentials_and_project_id(
             key_path=key_path,
             keyfile_dict=keyfile_dict_json,
             scopes=self.scopes,
-            delegate_to=self.delegate_to
+            delegate_to=self.delegate_to,
+            target_principal=target_principal,
+            delegates=delegates,
         )
 
         overridden_project_id = self._get_field('project')
diff --git a/airflow/providers/google/common/hooks/discovery_api.py 
b/airflow/providers/google/common/hooks/discovery_api.py
index 22adbb0..b001b54 100644
--- a/airflow/providers/google/common/hooks/discovery_api.py
+++ b/airflow/providers/google/common/hooks/discovery_api.py
@@ -19,7 +19,7 @@
 """
 This module allows you to connect to the Google Discovery API Service and 
query it.
 """
-from typing import Dict, Optional
+from typing import Dict, Optional, Sequence, Union
 
 from googleapiclient.discovery import Resource, build
 
@@ -37,10 +37,19 @@ class GoogleDiscoveryApiHook(GoogleBaseHook):
     :type api_version: str
     :param gcp_conn_id: The connection ID to use when fetching connection info.
     :type gcp_conn_id: str
-    :param delegate_to: The account to impersonate, if any.
-        For this to work, the service account making the request must have
+    :param delegate_to: The account to impersonate using domain-wide 
delegation of authority,
+        if any. For this to work, the service account making the request must 
have
         domain-wide delegation enabled.
     :type delegate_to: str
+    :param impersonation_chain: Optional service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account.
+    :type impersonation_chain: Union[str, Sequence[str]]
     """
     _conn = None  # type: Optional[Resource]
 
@@ -48,10 +57,15 @@ class GoogleDiscoveryApiHook(GoogleBaseHook):
         self,
         api_service_name: str,
         api_version: str,
-        gcp_conn_id='google_cloud_default',
-        delegate_to: Optional[str] = None
+        gcp_conn_id: str = "google_cloud_default",
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
     ) -> None:
-        super().__init__(gcp_conn_id=gcp_conn_id, delegate_to=delegate_to)
+        super().__init__(
+            gcp_conn_id=gcp_conn_id,
+            delegate_to=delegate_to,
+            impersonation_chain=impersonation_chain,
+        )
         self.api_service_name = api_service_name
         self.api_version = api_version
 
diff --git a/airflow/providers/google/firebase/hooks/firestore.py 
b/airflow/providers/google/firebase/hooks/firestore.py
index 162ca4d..2da903d 100644
--- a/airflow/providers/google/firebase/hooks/firestore.py
+++ b/airflow/providers/google/firebase/hooks/firestore.py
@@ -18,7 +18,7 @@
 """Hook for Google Cloud Firestore service"""
 
 import time
-from typing import Any, Dict, Optional
+from typing import Any, Dict, Optional, Sequence, Union
 
 from googleapiclient.discovery import build, build_from_document
 
@@ -41,10 +41,19 @@ class CloudFirestoreHook(GoogleBaseHook):
     :type api_version: str
     :param gcp_conn_id: The connection ID to use when fetching connection info.
     :type gcp_conn_id: str
-    :param delegate_to: The account to impersonate, if any.
-        For this to work, the service account making the request must have
+    :param delegate_to: The account to impersonate using domain-wide 
delegation of authority,
+        if any. For this to work, the service account making the request must 
have
         domain-wide delegation enabled.
     :type delegate_to: str
+    :param impersonation_chain: Optional service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account.
+    :type impersonation_chain: Union[str, Sequence[str]]
     """
 
     _conn = None  # type: Optional[Any]
@@ -54,8 +63,13 @@ class CloudFirestoreHook(GoogleBaseHook):
         api_version: str = "v1",
         gcp_conn_id: str = "google_cloud_default",
         delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
     ) -> None:
-        super().__init__(gcp_conn_id, delegate_to)
+        super().__init__(
+            gcp_conn_id=gcp_conn_id,
+            delegate_to=delegate_to,
+            impersonation_chain=impersonation_chain,
+        )
         self.api_version = api_version
 
     def get_conn(self):
diff --git 
a/airflow/providers/google/marketing_platform/hooks/campaign_manager.py 
b/airflow/providers/google/marketing_platform/hooks/campaign_manager.py
index 77c1f56..5c8ccdd 100644
--- a/airflow/providers/google/marketing_platform/hooks/campaign_manager.py
+++ b/airflow/providers/google/marketing_platform/hooks/campaign_manager.py
@@ -18,7 +18,7 @@
 """
 This module contains Google Campaign Manager hook.
 """
-from typing import Any, Dict, List, Optional
+from typing import Any, Dict, List, Optional, Sequence, Union
 
 from googleapiclient import http
 from googleapiclient.discovery import Resource, build
@@ -39,8 +39,13 @@ class GoogleCampaignManagerHook(GoogleBaseHook):
         api_version: str = "v3.3",
         gcp_conn_id: str = "google_cloud_default",
         delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
     ) -> None:
-        super().__init__(gcp_conn_id, delegate_to)
+        super().__init__(
+            gcp_conn_id=gcp_conn_id,
+            delegate_to=delegate_to,
+            impersonation_chain=impersonation_chain,
+        )
         self.api_version = api_version
 
     def get_conn(self) -> Resource:
diff --git a/airflow/providers/google/marketing_platform/hooks/display_video.py 
b/airflow/providers/google/marketing_platform/hooks/display_video.py
index c2c77ee..c3c7364 100644
--- a/airflow/providers/google/marketing_platform/hooks/display_video.py
+++ b/airflow/providers/google/marketing_platform/hooks/display_video.py
@@ -19,7 +19,7 @@
 This module contains Google DisplayVideo hook.
 """
 
-from typing import Any, Dict, List, Optional
+from typing import Any, Dict, List, Optional, Sequence, Union
 
 from googleapiclient.discovery import Resource, build
 
@@ -38,8 +38,13 @@ class GoogleDisplayVideo360Hook(GoogleBaseHook):
         api_version: str = "v1",
         gcp_conn_id: str = "google_cloud_default",
         delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
     ) -> None:
-        super().__init__(gcp_conn_id, delegate_to)
+        super().__init__(
+            gcp_conn_id=gcp_conn_id,
+            delegate_to=delegate_to,
+            impersonation_chain=impersonation_chain,
+        )
         self.api_version = api_version
 
     def get_conn(self) -> Resource:
diff --git a/airflow/providers/google/marketing_platform/hooks/search_ads.py 
b/airflow/providers/google/marketing_platform/hooks/search_ads.py
index 2264cc1..f6342aa 100644
--- a/airflow/providers/google/marketing_platform/hooks/search_ads.py
+++ b/airflow/providers/google/marketing_platform/hooks/search_ads.py
@@ -18,7 +18,7 @@
 """
 This module contains Google Search Ads 360 hook.
 """
-from typing import Any, Dict, Optional
+from typing import Any, Dict, Optional, Sequence, Union
 
 from googleapiclient.discovery import build
 
@@ -37,8 +37,13 @@ class GoogleSearchAdsHook(GoogleBaseHook):
         api_version: str = "v2",
         gcp_conn_id: str = "google_cloud_default",
         delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
     ) -> None:
-        super().__init__(gcp_conn_id, delegate_to)
+        super().__init__(
+            gcp_conn_id=gcp_conn_id,
+            delegate_to=delegate_to,
+            impersonation_chain=impersonation_chain,
+        )
         self.api_version = api_version
 
     def get_conn(self):
diff --git a/airflow/providers/google/suite/hooks/drive.py 
b/airflow/providers/google/suite/hooks/drive.py
index bac3557..16613fb 100644
--- a/airflow/providers/google/suite/hooks/drive.py
+++ b/airflow/providers/google/suite/hooks/drive.py
@@ -16,7 +16,7 @@
 # specific language governing permissions and limitations
 # under the License.
 """Hook for Google Drive service"""
-from typing import Any, Optional
+from typing import Any, Optional, Sequence, Union
 
 from googleapiclient.discovery import Resource, build
 from googleapiclient.http import MediaFileUpload
@@ -33,10 +33,19 @@ class GoogleDriveHook(GoogleBaseHook):
     :type api_version: str
     :param gcp_conn_id: The connection ID to use when fetching connection info.
     :type gcp_conn_id: str
-    :param delegate_to: The account to impersonate, if any.
-        For this to work, the service account making the request must have
+    :param delegate_to: The account to impersonate using domain-wide 
delegation of authority,
+        if any. For this to work, the service account making the request must 
have
         domain-wide delegation enabled.
     :type delegate_to: str
+    :param impersonation_chain: Optional service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account.
+    :type impersonation_chain: Union[str, Sequence[str]]
     """
 
     _conn = None  # type: Optional[Resource]
@@ -45,9 +54,14 @@ class GoogleDriveHook(GoogleBaseHook):
         self,
         api_version: str = "v3",
         gcp_conn_id: str = "google_cloud_default",
-        delegate_to: Optional[str] = None
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
     ) -> None:
-        super().__init__(gcp_conn_id, delegate_to)
+        super().__init__(
+            gcp_conn_id=gcp_conn_id,
+            delegate_to=delegate_to,
+            impersonation_chain=impersonation_chain,
+        )
         self.api_version = api_version
 
     def get_conn(self) -> Any:
diff --git a/airflow/providers/google/suite/hooks/sheets.py 
b/airflow/providers/google/suite/hooks/sheets.py
index 71e5b4e..2f973b3 100644
--- a/airflow/providers/google/suite/hooks/sheets.py
+++ b/airflow/providers/google/suite/hooks/sheets.py
@@ -20,7 +20,7 @@
 This module contains a Google Sheets API hook
 """
 
-from typing import Any, Dict, List, Optional
+from typing import Any, Dict, List, Optional, Sequence, Union
 
 from googleapiclient.discovery import build
 
@@ -38,19 +38,33 @@ class GSheetsHook(GoogleBaseHook):
     :type gcp_conn_id: str
     :param api_version: API Version
     :type api_version: str
-    :param delegate_to: The account to impersonate, if any.
-        For this to work, the service account making the request must have
+    :param delegate_to: The account to impersonate using domain-wide 
delegation of authority,
+        if any. For this to work, the service account making the request must 
have
         domain-wide delegation enabled.
     :type delegate_to: str
+    :param impersonation_chain: Optional service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account.
+    :type impersonation_chain: Union[str, Sequence[str]]
     """
 
     def __init__(
         self,
         gcp_conn_id: str = 'google_cloud_default',
         api_version: str = 'v4',
-        delegate_to: Optional[str] = None
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
     ) -> None:
-        super().__init__(gcp_conn_id, delegate_to)
+        super().__init__(
+            gcp_conn_id=gcp_conn_id,
+            delegate_to=delegate_to,
+            impersonation_chain=impersonation_chain,
+        )
         self.gcp_conn_id = gcp_conn_id
         self.api_version = api_version
         self.delegate_to = delegate_to
diff --git a/tests/providers/google/cloud/hooks/test_bigquery.py 
b/tests/providers/google/cloud/hooks/test_bigquery.py
index 8286a8e..a9f5472 100644
--- a/tests/providers/google/cloud/hooks/test_bigquery.py
+++ b/tests/providers/google/cloud/hooks/test_bigquery.py
@@ -84,7 +84,11 @@ class TestBigQueryHookMethods(_BigQueryBaseTestClass):
                           "You should pass the gcp_conn_id parameter."
         with self.assertWarns(DeprecationWarning) as warn:
             BigQueryHook(bigquery_conn_id=bigquery_conn_id)
-            mock_base_hook_init.assert_called_once_with(delegate_to=None, 
gcp_conn_id='bigquery conn id')
+            mock_base_hook_init.assert_called_once_with(
+                delegate_to=None,
+                gcp_conn_id='bigquery conn id',
+                impersonation_chain=None,
+            )
         self.assertEqual(warning_message, str(warn.warning))
 
     
@mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.get_service")
diff --git a/tests/providers/google/cloud/hooks/test_dataflow.py 
b/tests/providers/google/cloud/hooks/test_dataflow.py
index d3568f3..0b2ad7d 100644
--- a/tests/providers/google/cloud/hooks/test_dataflow.py
+++ b/tests/providers/google/cloud/hooks/test_dataflow.py
@@ -142,7 +142,12 @@ class TestFallbackToVariables(unittest.TestCase):
             FixutureFallback().test_fn({'project': "TEST"}, "TEST2")
 
 
-def mock_init(self, gcp_conn_id, delegate_to=None):  # pylint: 
disable=unused-argument
+def mock_init(
+    self,
+    gcp_conn_id,
+    delegate_to=None,
+    impersonation_chain=None,
+):  # pylint: disable=unused-argument
     pass
 
 
diff --git a/tests/providers/google/cloud/hooks/test_datastore.py 
b/tests/providers/google/cloud/hooks/test_datastore.py
index b1a0cde..93dd663 100644
--- a/tests/providers/google/cloud/hooks/test_datastore.py
+++ b/tests/providers/google/cloud/hooks/test_datastore.py
@@ -28,7 +28,12 @@ from airflow.providers.google.cloud.hooks.datastore import 
DatastoreHook
 GCP_PROJECT_ID = "test"
 
 
-def mock_init(self, gcp_conn_id, delegate_to=None):  # pylint: 
disable=unused-argument
+def mock_init(
+    self,
+    gcp_conn_id,
+    delegate_to=None,
+    impersonation_chain=None,
+):  # pylint: disable=unused-argument
     pass
 
 
diff --git a/tests/providers/google/cloud/hooks/test_gdm.py 
b/tests/providers/google/cloud/hooks/test_gdm.py
index e11f9a4..67e00e2 100644
--- a/tests/providers/google/cloud/hooks/test_gdm.py
+++ b/tests/providers/google/cloud/hooks/test_gdm.py
@@ -23,7 +23,12 @@ from airflow.exceptions import AirflowException
 from airflow.providers.google.cloud.hooks.gdm import 
GoogleDeploymentManagerHook
 
 
-def mock_init(self, gcp_conn_id, delegate_to=None):  # pylint: 
disable=unused-argument
+def mock_init(
+    self,
+    gcp_conn_id,
+    delegate_to=None,
+    impersonation_chain=None,
+):  # pylint: disable=unused-argument
     pass
 
 
diff --git a/tests/providers/google/cloud/hooks/test_kms.py 
b/tests/providers/google/cloud/hooks/test_kms.py
index 525839d..a1b1db2 100644
--- a/tests/providers/google/cloud/hooks/test_kms.py
+++ b/tests/providers/google/cloud/hooks/test_kms.py
@@ -45,7 +45,12 @@ TEST_KEY_ID = 
"projects/{}/locations/{}/keyRings/{}/cryptoKeys/{}".format(
 RESPONSE = Response(PLAINTEXT, PLAINTEXT)
 
 
-def mock_init(self, gcp_conn_id, delegate_to=None):  # pylint: 
disable=unused-argument
+def mock_init(
+    self,
+    gcp_conn_id,
+    delegate_to=None,
+    impersonation_chain=None,
+):  # pylint: disable=unused-argument
     pass
 
 
diff --git a/tests/providers/google/cloud/hooks/test_pubsub.py 
b/tests/providers/google/cloud/hooks/test_pubsub.py
index 0d3b992..f208781 100644
--- a/tests/providers/google/cloud/hooks/test_pubsub.py
+++ b/tests/providers/google/cloud/hooks/test_pubsub.py
@@ -51,7 +51,12 @@ EXPANDED_SUBSCRIPTION = 
'projects/{}/subscriptions/{}'.format(TEST_PROJECT, TEST
 LABELS = {'airflow-version': 'v' + version.replace('.', '-').replace('+', '-')}
 
 
-def mock_init(self, gcp_conn_id, delegate_to=None):  # pylint: 
disable=unused-argument
+def mock_init(
+    self,
+    gcp_conn_id,
+    delegate_to=None,
+    impersonation_chain=None,
+):  # pylint: disable=unused-argument
     pass
 
 
diff --git a/tests/providers/google/cloud/utils/base_gcp_mock.py 
b/tests/providers/google/cloud/utils/base_gcp_mock.py
index 04b796c..d20696b 100644
--- a/tests/providers/google/cloud/utils/base_gcp_mock.py
+++ b/tests/providers/google/cloud/utils/base_gcp_mock.py
@@ -23,22 +23,34 @@ from airflow.models import Connection
 GCP_PROJECT_ID_HOOK_UNIT_TEST = 'example-project'
 
 
-def mock_base_gcp_hook_default_project_id(self, 
gcp_conn_id='google_cloud_default', delegate_to=None):
+def mock_base_gcp_hook_default_project_id(
+    self,
+    gcp_conn_id='google_cloud_default',
+    delegate_to=None,
+    impersonation_chain=None,
+):
     self.extras = {
         'extra__google_cloud_platform__project': GCP_PROJECT_ID_HOOK_UNIT_TEST
     }
     self._conn = gcp_conn_id
     self.delegate_to = delegate_to
+    self.impersonation_chain = impersonation_chain
     self._client = None
     self._conn = None
     self._cached_credentials = None
     self._cached_project_id = None
 
 
-def mock_base_gcp_hook_no_default_project_id(self, 
gcp_conn_id='google_cloud_default', delegate_to=None):
+def mock_base_gcp_hook_no_default_project_id(
+    self,
+    gcp_conn_id='google_cloud_default',
+    delegate_to=None,
+    impersonation_chain=None,
+):
     self.extras = {}
     self._conn = gcp_conn_id
     self.delegate_to = delegate_to
+    self.impersonation_chain = impersonation_chain
     self._client = None
     self._conn = None
     self._cached_credentials = None
diff --git a/tests/providers/google/cloud/utils/test_credentials_provider.py 
b/tests/providers/google/cloud/utils/test_credentials_provider.py
index 0827de1..5066bb5 100644
--- a/tests/providers/google/cloud/utils/test_credentials_provider.py
+++ b/tests/providers/google/cloud/utils/test_credentials_provider.py
@@ -28,8 +28,8 @@ from parameterized import parameterized
 
 from airflow.exceptions import AirflowException
 from airflow.providers.google.cloud.utils.credentials_provider import (
-    _DEFAULT_SCOPES, AIRFLOW_CONN_GOOGLE_CLOUD_DEFAULT, _get_scopes, 
build_gcp_conn,
-    get_credentials_and_project_id, provide_gcp_conn_and_credentials, 
provide_gcp_connection,
+    _DEFAULT_SCOPES, AIRFLOW_CONN_GOOGLE_CLOUD_DEFAULT, _get_scopes, 
_get_target_principal_and_delegates,
+    build_gcp_conn, get_credentials_and_project_id, 
provide_gcp_conn_and_credentials, provide_gcp_connection,
     provide_gcp_credentials,
 )
 
@@ -171,6 +171,69 @@ class TestGetGcpCredentialsAndProjectId(unittest.TestCase):
         mock_auth_default.assert_called_once_with(scopes=scopes)
         self.assertEqual(mock_auth_default.return_value, result)
 
+    @mock.patch('airflow.providers.google.cloud.utils.credentials_provider.'
+                'impersonated_credentials.Credentials')
+    @mock.patch('google.auth.default')
+    def 
test_get_credentials_and_project_id_with_default_auth_and_target_principal(
+        self, mock_auth_default, mock_impersonated_credentials
+    ):
+        mock_credentials = mock.MagicMock()
+        mock_auth_default.return_value = (mock_credentials, 
self.test_project_id)
+
+        result = get_credentials_and_project_id(target_principal="ACCOUNT_1")
+        mock_auth_default.assert_called_once_with(scopes=None)
+        mock_impersonated_credentials.assert_called_once_with(
+            source_credentials=mock_credentials,
+            target_principal="ACCOUNT_1",
+            delegates=None,
+            target_scopes=None,
+        )
+        self.assertEqual((mock_impersonated_credentials.return_value, 
self.test_project_id), result)
+
+    @mock.patch('airflow.providers.google.cloud.utils.credentials_provider.'
+                'impersonated_credentials.Credentials')
+    @mock.patch('google.auth.default')
+    def 
test_get_credentials_and_project_id_with_default_auth_and_scopes_and_target_principal(
+        self, mock_auth_default, mock_impersonated_credentials
+    ):
+        mock_credentials = mock.MagicMock()
+        mock_auth_default.return_value = (mock_credentials, 
self.test_project_id)
+
+        result = get_credentials_and_project_id(
+            scopes=['scope1', 'scope2'],
+            target_principal="ACCOUNT_1",
+        )
+        mock_auth_default.assert_called_once_with(scopes=['scope1', 'scope2'])
+        mock_impersonated_credentials.assert_called_once_with(
+            source_credentials=mock_credentials,
+            target_principal="ACCOUNT_1",
+            delegates=None,
+            target_scopes=['scope1', 'scope2'],
+        )
+        self.assertEqual((mock_impersonated_credentials.return_value, 
self.test_project_id), result)
+
+    @mock.patch('airflow.providers.google.cloud.utils.credentials_provider.'
+                'impersonated_credentials.Credentials')
+    @mock.patch('google.auth.default')
+    def 
test_get_credentials_and_project_id_with_default_auth_and_target_principal_and_delegates(
+        self, mock_auth_default, mock_impersonated_credentials
+    ):
+        mock_credentials = mock.MagicMock()
+        mock_auth_default.return_value = (mock_credentials, 
self.test_project_id)
+
+        result = get_credentials_and_project_id(
+            target_principal="ACCOUNT_3",
+            delegates=["ACCOUNT_1", "ACCOUNT_2"],
+        )
+        mock_auth_default.assert_called_once_with(scopes=None)
+        mock_impersonated_credentials.assert_called_once_with(
+            source_credentials=mock_credentials,
+            target_principal="ACCOUNT_3",
+            delegates=["ACCOUNT_1", "ACCOUNT_2"],
+            target_scopes=None,
+        )
+        self.assertEqual((mock_impersonated_credentials.return_value, 
self.test_project_id), result)
+
     @mock.patch(
         'google.oauth2.service_account.Credentials.from_service_account_file',
     )
@@ -256,3 +319,24 @@ class TestGetScopes(unittest.TestCase):
     ])
     def test_get_scopes_with_input(self, _, scopes_str, scopes):
         self.assertEqual(_get_scopes(scopes_str), scopes)
+
+
+class TestGetTargetPrincipalAndDelegates(unittest.TestCase):
+
+    def test_get_target_principal_and_delegates_no_argument(self):
+        self.assertEqual(_get_target_principal_and_delegates(), (None, None))
+
+    @parameterized.expand([
+        ('string', "ACCOUNT_1", ("ACCOUNT_1", None)),
+        ('empty_list', [], (None, None)),
+        ('single_element_list', ["ACCOUNT_1"], ("ACCOUNT_1", [])),
+        ('multiple_elements_list',
+         ["ACCOUNT_1", "ACCOUNT_2", "ACCOUNT_3"], ("ACCOUNT_3", ["ACCOUNT_1", 
"ACCOUNT_2"])),
+    ])
+    def test_get_target_principal_and_delegates_with_input(
+        self, _, impersonation_chain, target_principal_and_delegates
+    ):
+        self.assertEqual(
+            _get_target_principal_and_delegates(impersonation_chain),
+            target_principal_and_delegates
+        )
diff --git a/tests/providers/google/common/hooks/test_base_google.py 
b/tests/providers/google/common/hooks/test_base_google.py
index 4f1e91d..5be2e69 100644
--- a/tests/providers/google/common/hooks/test_base_google.py
+++ b/tests/providers/google/common/hooks/test_base_google.py
@@ -28,6 +28,7 @@ import tenacity
 from google.auth.environment_vars import CREDENTIALS
 from google.auth.exceptions import GoogleAuthError
 from google.cloud.exceptions import Forbidden
+from parameterized import parameterized
 
 from airflow import version
 from airflow.exceptions import AirflowException
@@ -43,6 +44,7 @@ except GoogleAuthError:
     default_creds_available = False
 
 MODULE_NAME = "airflow.providers.google.common.hooks.base_google"
+PROJECT_ID = "PROJECT_ID"
 
 
 class NoForbiddenAfterCount:
@@ -341,7 +343,10 @@ class TestGoogleBaseHook(unittest.TestCase):
             key_path=None,
             keyfile_dict=None,
             scopes=self.instance.scopes,
-            delegate_to=None)
+            delegate_to=None,
+            target_principal=None,
+            delegates=None,
+        )
         self.assertEqual(('CREDENTIALS', 'PROJECT_ID'), result)
 
     @mock.patch(MODULE_NAME + '.get_credentials_and_project_id')
@@ -359,7 +364,9 @@ class TestGoogleBaseHook(unittest.TestCase):
             key_path='KEY_PATH.json',
             keyfile_dict=None,
             scopes=self.instance.scopes,
-            delegate_to=None
+            delegate_to=None,
+            target_principal=None,
+            delegates=None,
         )
         self.assertEqual((mock_credentials, 'PROJECT_ID'), result)
 
@@ -399,7 +406,9 @@ class TestGoogleBaseHook(unittest.TestCase):
             key_path=None,
             keyfile_dict=service_account,
             scopes=self.instance.scopes,
-            delegate_to=None
+            delegate_to=None,
+            target_principal=None,
+            delegates=None,
         )
         self.assertEqual((mock_credentials, 'PROJECT_ID'), result)
 
@@ -417,7 +426,9 @@ class TestGoogleBaseHook(unittest.TestCase):
             key_path=None,
             keyfile_dict=None,
             scopes=self.instance.scopes,
-            delegate_to="USER"
+            delegate_to="USER",
+            target_principal=None,
+            delegates=None,
         )
         self.assertEqual((mock_credentials, "PROJECT_ID"), result)
 
@@ -451,7 +462,9 @@ class TestGoogleBaseHook(unittest.TestCase):
             key_path=None,
             keyfile_dict=None,
             scopes=self.instance.scopes,
-            delegate_to=None
+            delegate_to=None,
+            target_principal=None,
+            delegates=None,
         )
         self.assertEqual(("CREDENTIALS", 'SECOND_PROJECT_ID'), result)
 
@@ -632,6 +645,35 @@ class TestGoogleBaseHook(unittest.TestCase):
         http_authorized = self.instance._authorize().http
         self.assertNotEqual(http_authorized.timeout, None)
 
+    @parameterized.expand([
+        ('string', "ACCOUNT_1", "ACCOUNT_1", None),
+        ('single_element_list', ["ACCOUNT_1"], "ACCOUNT_1", []),
+        ('multiple_elements_list',
+         ["ACCOUNT_1", "ACCOUNT_2", "ACCOUNT_3"], "ACCOUNT_3", ["ACCOUNT_1", 
"ACCOUNT_2"]),
+    ])
+    @mock.patch(MODULE_NAME + '.get_credentials_and_project_id')
+    def test_get_credentials_and_project_id_with_impersonation_chain(
+        self,
+        _,
+        impersonation_chain,
+        target_principal,
+        delegates,
+        mock_get_creds_and_proj_id,
+    ):
+        mock_credentials = mock.MagicMock()
+        mock_get_creds_and_proj_id.return_value = (mock_credentials, 
PROJECT_ID)
+        self.instance.impersonation_chain = impersonation_chain
+        result = self.instance._get_credentials_and_project_id()
+        mock_get_creds_and_proj_id.assert_called_once_with(
+            key_path=None,
+            keyfile_dict=None,
+            scopes=self.instance.scopes,
+            delegate_to=None,
+            target_principal=target_principal,
+            delegates=delegates,
+        )
+        self.assertEqual((mock_credentials, PROJECT_ID), result)
+
 
 class TestProvideAuthorizedGcloud(unittest.TestCase):
     def setUp(self):

Reply via email to