This is an automated email from the ASF dual-hosted git repository.
pankaj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 1c14767638 Fix google operators handling of impersonation chain
(#36903)
1c14767638 is described below
commit 1c14767638c26dbfaa2b984f9f5bbeb483bd88cf
Author: Cedrik Neumann <[email protected]>
AuthorDate: Tue Jan 23 11:14:15 2024 +0100
Fix google operators handling of impersonation chain (#36903)
* fix(MetastoreHivePartitionSensor): pass impersonation chain to GCSHook
The operator already accepts `impersonation_chain`, but does not pass
it to the GCSHook.
* fix(BigQueryGetDataOperator): pass impersonation chain to
BigQueryGetDataTrigger
The operator already accepts `impersonation_chain`, but does not pass
it to the BigQueryGetDataTrigger.
* fix(BigQueryInsertJobOperator): pass impersonation chain to
BigQueryInsertJobTrigger
The operator already accepts `impersonation_chain`, but does not pass
it to the BigQueryInsertJobTrigger.
* fix(BigQueryToGCSOperator): pass impersonation chain to
BigQueryInsertJobTrigger
The operator already accepts `impersonation_chain`, but does not pass
it to the BigQueryInsertJobTrigger.
* fix(GCSToBigQueryOperator): pass impersonation chain to
BigQueryInsertJobTrigger
The operator already accepts `impersonation_chain`, but does not pass
it to the BigQueryInsertJobTrigger.
---
airflow/providers/google/cloud/hooks/gcs.py | 11 +++++++++--
airflow/providers/google/cloud/operators/bigquery.py | 2 ++
.../providers/google/cloud/sensors/dataproc_metastore.py | 14 ++++++++++++--
.../providers/google/cloud/transfers/bigquery_to_gcs.py | 1 +
.../providers/google/cloud/transfers/gcs_to_bigquery.py | 1 +
5 files changed, 25 insertions(+), 4 deletions(-)
diff --git a/airflow/providers/google/cloud/hooks/gcs.py
b/airflow/providers/google/cloud/hooks/gcs.py
index f784b3fbbb..d904f00f4d 100644
--- a/airflow/providers/google/cloud/hooks/gcs.py
+++ b/airflow/providers/google/cloud/hooks/gcs.py
@@ -1336,7 +1336,11 @@ def gcs_object_is_directory(bucket: str) -> bool:
return len(blob) == 0 or blob.endswith("/")
-def parse_json_from_gcs(gcp_conn_id: str, file_uri: str) -> Any:
+def parse_json_from_gcs(
+ gcp_conn_id: str,
+ file_uri: str,
+ impersonation_chain: str | Sequence[str] | None = None,
+) -> Any:
"""
Downloads and parses json file from Google cloud Storage.
@@ -1344,7 +1348,10 @@ def parse_json_from_gcs(gcp_conn_id: str, file_uri: str)
-> Any:
:param file_uri: full path to json file
example: ``gs://test-bucket/dir1/dir2/file``
"""
- gcs_hook = GCSHook(gcp_conn_id=gcp_conn_id)
+ gcs_hook = GCSHook(
+ gcp_conn_id=gcp_conn_id,
+ impersonation_chain=impersonation_chain,
+ )
bucket, blob = _parse_gcs_url(file_uri)
with NamedTemporaryFile(mode="w+b") as file:
try:
diff --git a/airflow/providers/google/cloud/operators/bigquery.py
b/airflow/providers/google/cloud/operators/bigquery.py
index f10e5fbf5c..d6b2bf8c10 100644
--- a/airflow/providers/google/cloud/operators/bigquery.py
+++ b/airflow/providers/google/cloud/operators/bigquery.py
@@ -1069,6 +1069,7 @@ class BigQueryGetDataOperator(GoogleCloudBaseOperator):
project_id=self.job_project_id or hook.project_id,
poll_interval=self.poll_interval,
as_dict=self.as_dict,
+ impersonation_chain=self.impersonation_chain,
),
method_name="execute_complete",
)
@@ -2878,6 +2879,7 @@ class BigQueryInsertJobOperator(GoogleCloudBaseOperator,
_BigQueryOpenLineageMix
job_id=self.job_id,
project_id=self.project_id,
poll_interval=self.poll_interval,
+ impersonation_chain=self.impersonation_chain,
),
method_name="execute_complete",
)
diff --git a/airflow/providers/google/cloud/sensors/dataproc_metastore.py
b/airflow/providers/google/cloud/sensors/dataproc_metastore.py
index ccb2226452..3ebf5c0f3c 100644
--- a/airflow/providers/google/cloud/sensors/dataproc_metastore.py
+++ b/airflow/providers/google/cloud/sensors/dataproc_metastore.py
@@ -93,7 +93,11 @@ class MetastoreHivePartitionSensor(BaseSensorOperator):
self.log.info("Received result manifest URI: %s", result_manifest_uri)
self.log.info("Extracting result manifest")
- manifest: dict = parse_json_from_gcs(gcp_conn_id=self.gcp_conn_id,
file_uri=result_manifest_uri)
+ manifest: dict = parse_json_from_gcs(
+ gcp_conn_id=self.gcp_conn_id,
+ file_uri=result_manifest_uri,
+ impersonation_chain=self.impersonation_chain,
+ )
if not (manifest and isinstance(manifest, dict)):
# TODO: remove this if check when min_airflow_version is set to
higher than 2.7.1
message = (
@@ -115,7 +119,13 @@ class MetastoreHivePartitionSensor(BaseSensorOperator):
result_base_uri = result_manifest_uri.rsplit("/", 1)[0]
results = (f"{result_base_uri}//{filename}" for filename in
manifest.get("filenames", []))
found_partitions = sum(
- len(parse_json_from_gcs(gcp_conn_id=self.gcp_conn_id,
file_uri=uri).get("rows", []))
+ len(
+ parse_json_from_gcs(
+ gcp_conn_id=self.gcp_conn_id,
+ file_uri=uri,
+ impersonation_chain=self.impersonation_chain,
+ ).get("rows", [])
+ )
for uri in results
)
diff --git a/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py
b/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py
index 58456b10f9..3ede4db32f 100644
--- a/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py
+++ b/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py
@@ -261,6 +261,7 @@ class BigQueryToGCSOperator(BaseOperator):
conn_id=self.gcp_conn_id,
job_id=self._job_id,
project_id=self.project_id or self.hook.project_id,
+ impersonation_chain=self.impersonation_chain,
),
method_name="execute_complete",
)
diff --git a/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py
b/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py
index b22b7d8f8d..9d8ce53f4c 100644
--- a/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py
+++ b/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py
@@ -435,6 +435,7 @@ class GCSToBigQueryOperator(BaseOperator):
conn_id=self.gcp_conn_id,
job_id=self.job_id,
project_id=self.project_id or self.hook.project_id,
+ impersonation_chain=self.impersonation_chain,
),
method_name="execute_complete",
)