This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new ba3665f76a Bigquery: fix links for already existing tables and 
datasets. (#31589)
ba3665f76a is described below

commit ba3665f76a2205bad4553ba00537026a1346e9ae
Author: Beata Kossakowska <[email protected]>
AuthorDate: Thu Jun 8 21:20:07 2023 +0200

    Bigquery: fix links for already existing tables and datasets. (#31589)
    
    Co-authored-by: Beata Kossakowska <[email protected]>
---
 .../providers/google/cloud/operators/bigquery.py   | 57 +++++++++++++++-------
 1 file changed, 39 insertions(+), 18 deletions(-)

diff --git a/airflow/providers/google/cloud/operators/bigquery.py 
b/airflow/providers/google/cloud/operators/bigquery.py
index 7f9c38c515..2fd6aacf5c 100644
--- a/airflow/providers/google/cloud/operators/bigquery.py
+++ b/airflow/providers/google/cloud/operators/bigquery.py
@@ -1416,13 +1416,13 @@ class 
BigQueryCreateEmptyTableOperator(GoogleCloudBaseOperator):
                 table_resource=self.table_resource,
                 exists_ok=self.if_exists == IfExistAction.IGNORE,
             )
-            BigQueryTableLink.persist(
-                context=context,
-                task_instance=self,
-                dataset_id=table.to_api_repr()["tableReference"]["datasetId"],
-                project_id=table.to_api_repr()["tableReference"]["projectId"],
-                table_id=table.to_api_repr()["tableReference"]["tableId"],
-            )
+            persist_kwargs = {
+                "context": context,
+                "task_instance": self,
+                "project_id": 
table.to_api_repr()["tableReference"]["projectId"],
+                "dataset_id": 
table.to_api_repr()["tableReference"]["datasetId"],
+                "table_id": table.to_api_repr()["tableReference"]["tableId"],
+            }
             self.log.info(
                 "Table %s.%s.%s created successfully", table.project, 
table.dataset_id, table.table_id
             )
@@ -1430,11 +1430,20 @@ class 
BigQueryCreateEmptyTableOperator(GoogleCloudBaseOperator):
             error_msg = f"Table {self.dataset_id}.{self.table_id} already 
exists."
             if self.if_exists == IfExistAction.LOG:
                 self.log.info(error_msg)
+                persist_kwargs = {
+                    "context": context,
+                    "task_instance": self,
+                    "project_id": self.project_id or bq_hook.project_id,
+                    "dataset_id": self.dataset_id,
+                    "table_id": self.table_id,
+                }
             elif self.if_exists == IfExistAction.FAIL:
                 raise AirflowException(error_msg)
             else:
                 raise AirflowSkipException(error_msg)
 
+        BigQueryTableLink.persist(**persist_kwargs)
+
 
 class BigQueryCreateExternalTableOperator(GoogleCloudBaseOperator):
     """
@@ -1896,14 +1905,24 @@ class 
BigQueryCreateEmptyDatasetOperator(GoogleCloudBaseOperator):
                 location=self.location,
                 exists_ok=self.if_exists == IfExistAction.IGNORE,
             )
-            BigQueryDatasetLink.persist(
-                context=context,
-                task_instance=self,
-                dataset_id=dataset["datasetReference"]["datasetId"],
-                project_id=dataset["datasetReference"]["projectId"],
-            )
+            persist_kwargs = {
+                "context": context,
+                "task_instance": self,
+                "project_id": dataset["datasetReference"]["projectId"],
+                "dataset_id": dataset["datasetReference"]["datasetId"],
+            }
+
         except Conflict:
             dataset_id = self.dataset_reference.get("datasetReference", 
{}).get("datasetId", self.dataset_id)
+            project_id = self.dataset_reference.get("datasetReference", 
{}).get(
+                "projectId", self.project_id or bq_hook.project_id
+            )
+            persist_kwargs = {
+                "context": context,
+                "task_instance": self,
+                "project_id": project_id,
+                "dataset_id": dataset_id,
+            }
             error_msg = f"Dataset {dataset_id} already exists."
             if self.if_exists == IfExistAction.LOG:
                 self.log.info(error_msg)
@@ -1911,6 +1930,7 @@ class 
BigQueryCreateEmptyDatasetOperator(GoogleCloudBaseOperator):
                 raise AirflowException(error_msg)
             else:
                 raise AirflowSkipException(error_msg)
+        BigQueryDatasetLink.persist(**persist_kwargs)
 
 
 class BigQueryGetDatasetOperator(GoogleCloudBaseOperator):
@@ -1975,7 +1995,7 @@ class BigQueryGetDatasetOperator(GoogleCloudBaseOperator):
             dataset_id=dataset_api_repr["datasetReference"]["datasetId"],
             project_id=dataset_api_repr["datasetReference"]["projectId"],
         )
-        return dataset
+        return dataset_api_repr
 
 
 class BigQueryGetDatasetTablesOperator(GoogleCloudBaseOperator):
@@ -2272,7 +2292,7 @@ class 
BigQueryUpdateDatasetOperator(GoogleCloudBaseOperator):
             dataset_id=dataset_api_repr["datasetReference"]["datasetId"],
             project_id=dataset_api_repr["datasetReference"]["projectId"],
         )
-        return dataset
+        return dataset_api_repr
 
 
 class BigQueryDeleteTableOperator(GoogleCloudBaseOperator):
@@ -2688,7 +2708,8 @@ class BigQueryInsertJobOperator(GoogleCloudBaseOperator):
             QueryJob._JOB_TYPE: ["destinationTable"],
         }
 
-        if self.project_id:
+        project_id = self.project_id or hook.project_id
+        if project_id:
             for job_type, tables_prop in job_types.items():
                 job_configuration = job.to_api_repr()["configuration"]
                 if job_type in job_configuration:
@@ -2698,13 +2719,13 @@ class 
BigQueryInsertJobOperator(GoogleCloudBaseOperator):
                             persist_kwargs = {
                                 "context": context,
                                 "task_instance": self,
-                                "project_id": self.project_id,
+                                "project_id": project_id,
                                 "table_id": table,
                             }
                             if not isinstance(table, str):
                                 persist_kwargs["table_id"] = table["tableId"]
                                 persist_kwargs["dataset_id"] = 
table["datasetId"]
-
+                                persist_kwargs["project_id"] = 
table["projectId"]
                             BigQueryTableLink.persist(**persist_kwargs)
 
         self.job_id = job.job_id

Reply via email to