This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 0f513472a7 Add `impersonation_scopes` to BigQuery (#38169)
0f513472a7 is described below
commit 0f513472a72c04757a3fd16d4805b9048253a05c
Author: Ying <[email protected]>
AuthorDate: Sat Mar 30 01:03:38 2024 -0700
Add `impersonation_scopes` to BigQuery (#38169)
---
airflow/providers/google/cloud/hooks/bigquery.py | 19 +++++++++++++++++++
airflow/providers/google/cloud/operators/bigquery.py | 3 +++
.../providers/google/cloud/operators/test_bigquery.py | 15 +++++++++++++++
3 files changed, 37 insertions(+)
diff --git a/airflow/providers/google/cloud/hooks/bigquery.py
b/airflow/providers/google/cloud/hooks/bigquery.py
index eed3b8ed48..8e73ea46d2 100644
--- a/airflow/providers/google/cloud/hooks/bigquery.py
+++ b/airflow/providers/google/cloud/hooks/bigquery.py
@@ -56,6 +56,7 @@ from sqlalchemy import create_engine
from airflow.exceptions import AirflowException,
AirflowProviderDeprecationWarning
from airflow.providers.common.sql.hooks.sql import DbApiHook
from airflow.providers.google.cloud.utils.bigquery import bq_cast
+from airflow.providers.google.cloud.utils.credentials_provider import
_get_scopes
from airflow.providers.google.common.consts import CLIENT_INFO
from airflow.providers.google.common.hooks.base_google import
GoogleBaseAsyncHook, GoogleBaseHook, get_field
@@ -92,6 +93,8 @@ class BigQueryHook(GoogleBaseHook, DbApiHook):
Google BigQuery jobs.
:param impersonation_chain: This is the optional service account to
impersonate using short term credentials.
+ :param impersonation_scopes: Optional list of scopes for impersonated
account.
+ Will override scopes from connection.
:param labels: The BigQuery resource label.
"""
@@ -108,6 +111,7 @@ class BigQueryHook(GoogleBaseHook, DbApiHook):
priority: str = "INTERACTIVE",
api_resource_configs: dict | None = None,
impersonation_chain: str | Sequence[str] | None = None,
+ impersonation_scopes: str | Sequence[str] | None = None,
labels: dict | None = None,
**kwargs,
) -> None:
@@ -127,6 +131,7 @@ class BigQueryHook(GoogleBaseHook, DbApiHook):
self.api_resource_configs: dict = api_resource_configs or {}
self.labels = labels
self.credentials_path = "bigquery_hook_credentials.json"
+ self.impersonation_scopes = impersonation_scopes
def get_conn(self) -> BigQueryConnection:
"""Get a BigQuery PEP 249 connection object."""
@@ -2335,6 +2340,20 @@ class BigQueryHook(GoogleBaseHook, DbApiHook):
return project_id, dataset_id, table_id
+ @property
+ def scopes(self) -> Sequence[str]:
+ """
+ Return OAuth 2.0 scopes.
+
+ :return: Returns the scope defined in impersonation_scopes, the
connection configuration, or the default scope
+ """
+ scope_value: str | None
+ if self.impersonation_chain and self.impersonation_scopes:
+ scope_value = ",".join(self.impersonation_scopes)
+ else:
+ scope_value = self._get_field("scope", None)
+ return _get_scopes(scope_value)
+
class BigQueryConnection:
"""BigQuery connection.
diff --git a/airflow/providers/google/cloud/operators/bigquery.py
b/airflow/providers/google/cloud/operators/bigquery.py
index 47b6d228d5..68b423fb46 100644
--- a/airflow/providers/google/cloud/operators/bigquery.py
+++ b/airflow/providers/google/cloud/operators/bigquery.py
@@ -1217,6 +1217,7 @@ class
BigQueryExecuteQueryOperator(GoogleCloudBaseOperator):
location: str | None = None,
encryption_configuration: dict | None = None,
impersonation_chain: str | Sequence[str] | None = None,
+ impersonation_scopes: str | Sequence[str] | None = None,
job_id: str | list[str] | None = None,
**kwargs,
) -> None:
@@ -1243,6 +1244,7 @@ class
BigQueryExecuteQueryOperator(GoogleCloudBaseOperator):
self.encryption_configuration = encryption_configuration
self.hook: BigQueryHook | None = None
self.impersonation_chain = impersonation_chain
+ self.impersonation_scopes = impersonation_scopes
self.job_id = job_id
def execute(self, context: Context):
@@ -1253,6 +1255,7 @@ class
BigQueryExecuteQueryOperator(GoogleCloudBaseOperator):
use_legacy_sql=self.use_legacy_sql,
location=self.location,
impersonation_chain=self.impersonation_chain,
+ impersonation_scopes=self.impersonation_scopes,
)
if isinstance(self.sql, str):
self.job_id = self.hook.run_query(
diff --git a/tests/providers/google/cloud/operators/test_bigquery.py
b/tests/providers/google/cloud/operators/test_bigquery.py
index ae0bcbfb89..f1cc16a344 100644
--- a/tests/providers/google/cloud/operators/test_bigquery.py
+++ b/tests/providers/google/cloud/operators/test_bigquery.py
@@ -543,9 +543,24 @@ class TestBigQueryOperator:
api_resource_configs=None,
cluster_fields=None,
encryption_configuration=encryption_configuration,
+
impersonation_chain=["[email protected]"],
+ impersonation_scopes=[
+ "https://www.googleapis.com/auth/cloud-platform",
+ "https://www.googleapis.com/auth/drive",
+ ],
)
operator.execute(MagicMock())
+ mock_hook.assert_called_with(
+ gcp_conn_id="google_cloud_default",
+ use_legacy_sql=True,
+ location=None,
+
impersonation_chain=["[email protected]"],
+ impersonation_scopes=[
+ "https://www.googleapis.com/auth/cloud-platform",
+ "https://www.googleapis.com/auth/drive",
+ ],
+ )
mock_hook.return_value.run_query.assert_called_once_with(
sql="Select * from test_table",
destination_dataset_table=None,