This is an automated email from the ASF dual-hosted git repository.

pankaj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 1c14767638 Fix google operators handling of impersonation chain 
(#36903)
1c14767638 is described below

commit 1c14767638c26dbfaa2b984f9f5bbeb483bd88cf
Author: Cedrik Neumann <[email protected]>
AuthorDate: Tue Jan 23 11:14:15 2024 +0100

    Fix google operators handling of impersonation chain (#36903)
    
    * fix(MetastoreHivePartitionSensor): pass impersonation chain to GCSHook
    
    The operator already accepts `impersonation_chain`, but does not pass
    it to the GCSHook.
    
    * fix(BigQueryGetDataOperator): pass impersonation chain to 
BigQueryGetDataTrigger
    
    The operator already accepts `impersonation_chain`, but does not pass
    it to the BigQueryGetDataTrigger.
    
    * fix(BigQueryInsertJobOperator): pass impersonation chain to 
BigQueryInsertJobTrigger
    
    The operator already accepts `impersonation_chain`, but does not pass
    it to the BigQueryInsertJobTrigger.
    
    * fix(BigQueryToGCSOperator): pass impersonation chain to 
BigQueryInsertJobTrigger
    
    The operator already accepts `impersonation_chain`, but does not pass
    it to the BigQueryInsertJobTrigger.
    
    * fix(GCSToBigQueryOperator): pass impersonation chain to 
BigQueryInsertJobTrigger
    
    The operator already accepts `impersonation_chain`, but does not pass
    it to the BigQueryInsertJobTrigger.
---
 airflow/providers/google/cloud/hooks/gcs.py                | 11 +++++++++--
 airflow/providers/google/cloud/operators/bigquery.py       |  2 ++
 .../providers/google/cloud/sensors/dataproc_metastore.py   | 14 ++++++++++++--
 .../providers/google/cloud/transfers/bigquery_to_gcs.py    |  1 +
 .../providers/google/cloud/transfers/gcs_to_bigquery.py    |  1 +
 5 files changed, 25 insertions(+), 4 deletions(-)

diff --git a/airflow/providers/google/cloud/hooks/gcs.py 
b/airflow/providers/google/cloud/hooks/gcs.py
index f784b3fbbb..d904f00f4d 100644
--- a/airflow/providers/google/cloud/hooks/gcs.py
+++ b/airflow/providers/google/cloud/hooks/gcs.py
@@ -1336,7 +1336,11 @@ def gcs_object_is_directory(bucket: str) -> bool:
     return len(blob) == 0 or blob.endswith("/")
 
 
-def parse_json_from_gcs(gcp_conn_id: str, file_uri: str) -> Any:
+def parse_json_from_gcs(
+    gcp_conn_id: str,
+    file_uri: str,
+    impersonation_chain: str | Sequence[str] | None = None,
+) -> Any:
     """
     Downloads and parses json file from Google cloud Storage.
 
@@ -1344,7 +1348,10 @@ def parse_json_from_gcs(gcp_conn_id: str, file_uri: str) 
-> Any:
     :param file_uri: full path to json file
         example: ``gs://test-bucket/dir1/dir2/file``
     """
-    gcs_hook = GCSHook(gcp_conn_id=gcp_conn_id)
+    gcs_hook = GCSHook(
+        gcp_conn_id=gcp_conn_id,
+        impersonation_chain=impersonation_chain,
+    )
     bucket, blob = _parse_gcs_url(file_uri)
     with NamedTemporaryFile(mode="w+b") as file:
         try:
diff --git a/airflow/providers/google/cloud/operators/bigquery.py 
b/airflow/providers/google/cloud/operators/bigquery.py
index f10e5fbf5c..d6b2bf8c10 100644
--- a/airflow/providers/google/cloud/operators/bigquery.py
+++ b/airflow/providers/google/cloud/operators/bigquery.py
@@ -1069,6 +1069,7 @@ class BigQueryGetDataOperator(GoogleCloudBaseOperator):
                 project_id=self.job_project_id or hook.project_id,
                 poll_interval=self.poll_interval,
                 as_dict=self.as_dict,
+                impersonation_chain=self.impersonation_chain,
             ),
             method_name="execute_complete",
         )
@@ -2878,6 +2879,7 @@ class BigQueryInsertJobOperator(GoogleCloudBaseOperator, 
_BigQueryOpenLineageMix
                         job_id=self.job_id,
                         project_id=self.project_id,
                         poll_interval=self.poll_interval,
+                        impersonation_chain=self.impersonation_chain,
                     ),
                     method_name="execute_complete",
                 )
diff --git a/airflow/providers/google/cloud/sensors/dataproc_metastore.py 
b/airflow/providers/google/cloud/sensors/dataproc_metastore.py
index ccb2226452..3ebf5c0f3c 100644
--- a/airflow/providers/google/cloud/sensors/dataproc_metastore.py
+++ b/airflow/providers/google/cloud/sensors/dataproc_metastore.py
@@ -93,7 +93,11 @@ class MetastoreHivePartitionSensor(BaseSensorOperator):
         self.log.info("Received result manifest URI: %s", result_manifest_uri)
 
         self.log.info("Extracting result manifest")
-        manifest: dict = parse_json_from_gcs(gcp_conn_id=self.gcp_conn_id, 
file_uri=result_manifest_uri)
+        manifest: dict = parse_json_from_gcs(
+            gcp_conn_id=self.gcp_conn_id,
+            file_uri=result_manifest_uri,
+            impersonation_chain=self.impersonation_chain,
+        )
         if not (manifest and isinstance(manifest, dict)):
             # TODO: remove this if check when min_airflow_version is set to 
higher than 2.7.1
             message = (
@@ -115,7 +119,13 @@ class MetastoreHivePartitionSensor(BaseSensorOperator):
         result_base_uri = result_manifest_uri.rsplit("/", 1)[0]
         results = (f"{result_base_uri}//{filename}" for filename in 
manifest.get("filenames", []))
         found_partitions = sum(
-            len(parse_json_from_gcs(gcp_conn_id=self.gcp_conn_id, 
file_uri=uri).get("rows", []))
+            len(
+                parse_json_from_gcs(
+                    gcp_conn_id=self.gcp_conn_id,
+                    file_uri=uri,
+                    impersonation_chain=self.impersonation_chain,
+                ).get("rows", [])
+            )
             for uri in results
         )
 
diff --git a/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py 
b/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py
index 58456b10f9..3ede4db32f 100644
--- a/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py
+++ b/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py
@@ -261,6 +261,7 @@ class BigQueryToGCSOperator(BaseOperator):
                     conn_id=self.gcp_conn_id,
                     job_id=self._job_id,
                     project_id=self.project_id or self.hook.project_id,
+                    impersonation_chain=self.impersonation_chain,
                 ),
                 method_name="execute_complete",
             )
diff --git a/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py 
b/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py
index b22b7d8f8d..9d8ce53f4c 100644
--- a/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py
+++ b/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py
@@ -435,6 +435,7 @@ class GCSToBigQueryOperator(BaseOperator):
                         conn_id=self.gcp_conn_id,
                         job_id=self.job_id,
                         project_id=self.project_id or self.hook.project_id,
+                        impersonation_chain=self.impersonation_chain,
                     ),
                     method_name="execute_complete",
                 )

Reply via email to