This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new ba3665f76a Bigquery: fix links for already existing tables and
datasets. (#31589)
ba3665f76a is described below
commit ba3665f76a2205bad4553ba00537026a1346e9ae
Author: Beata Kossakowska <[email protected]>
AuthorDate: Thu Jun 8 21:20:07 2023 +0200
Bigquery: fix links for already existing tables and datasets. (#31589)
Co-authored-by: Beata Kossakowska <[email protected]>
---
.../providers/google/cloud/operators/bigquery.py | 57 +++++++++++++++-------
1 file changed, 39 insertions(+), 18 deletions(-)
diff --git a/airflow/providers/google/cloud/operators/bigquery.py
b/airflow/providers/google/cloud/operators/bigquery.py
index 7f9c38c515..2fd6aacf5c 100644
--- a/airflow/providers/google/cloud/operators/bigquery.py
+++ b/airflow/providers/google/cloud/operators/bigquery.py
@@ -1416,13 +1416,13 @@ class
BigQueryCreateEmptyTableOperator(GoogleCloudBaseOperator):
table_resource=self.table_resource,
exists_ok=self.if_exists == IfExistAction.IGNORE,
)
- BigQueryTableLink.persist(
- context=context,
- task_instance=self,
- dataset_id=table.to_api_repr()["tableReference"]["datasetId"],
- project_id=table.to_api_repr()["tableReference"]["projectId"],
- table_id=table.to_api_repr()["tableReference"]["tableId"],
- )
+ persist_kwargs = {
+ "context": context,
+ "task_instance": self,
+ "project_id":
table.to_api_repr()["tableReference"]["projectId"],
+ "dataset_id":
table.to_api_repr()["tableReference"]["datasetId"],
+ "table_id": table.to_api_repr()["tableReference"]["tableId"],
+ }
self.log.info(
"Table %s.%s.%s created successfully", table.project,
table.dataset_id, table.table_id
)
@@ -1430,11 +1430,20 @@ class
BigQueryCreateEmptyTableOperator(GoogleCloudBaseOperator):
error_msg = f"Table {self.dataset_id}.{self.table_id} already
exists."
if self.if_exists == IfExistAction.LOG:
self.log.info(error_msg)
+ persist_kwargs = {
+ "context": context,
+ "task_instance": self,
+ "project_id": self.project_id or bq_hook.project_id,
+ "dataset_id": self.dataset_id,
+ "table_id": self.table_id,
+ }
elif self.if_exists == IfExistAction.FAIL:
raise AirflowException(error_msg)
else:
raise AirflowSkipException(error_msg)
+ BigQueryTableLink.persist(**persist_kwargs)
+
class BigQueryCreateExternalTableOperator(GoogleCloudBaseOperator):
"""
@@ -1896,14 +1905,24 @@ class
BigQueryCreateEmptyDatasetOperator(GoogleCloudBaseOperator):
location=self.location,
exists_ok=self.if_exists == IfExistAction.IGNORE,
)
- BigQueryDatasetLink.persist(
- context=context,
- task_instance=self,
- dataset_id=dataset["datasetReference"]["datasetId"],
- project_id=dataset["datasetReference"]["projectId"],
- )
+ persist_kwargs = {
+ "context": context,
+ "task_instance": self,
+ "project_id": dataset["datasetReference"]["projectId"],
+ "dataset_id": dataset["datasetReference"]["datasetId"],
+ }
+
except Conflict:
dataset_id = self.dataset_reference.get("datasetReference",
{}).get("datasetId", self.dataset_id)
+ project_id = self.dataset_reference.get("datasetReference",
{}).get(
+ "projectId", self.project_id or bq_hook.project_id
+ )
+ persist_kwargs = {
+ "context": context,
+ "task_instance": self,
+ "project_id": project_id,
+ "dataset_id": dataset_id,
+ }
error_msg = f"Dataset {dataset_id} already exists."
if self.if_exists == IfExistAction.LOG:
self.log.info(error_msg)
@@ -1911,6 +1930,7 @@ class
BigQueryCreateEmptyDatasetOperator(GoogleCloudBaseOperator):
raise AirflowException(error_msg)
else:
raise AirflowSkipException(error_msg)
+ BigQueryDatasetLink.persist(**persist_kwargs)
class BigQueryGetDatasetOperator(GoogleCloudBaseOperator):
@@ -1975,7 +1995,7 @@ class BigQueryGetDatasetOperator(GoogleCloudBaseOperator):
dataset_id=dataset_api_repr["datasetReference"]["datasetId"],
project_id=dataset_api_repr["datasetReference"]["projectId"],
)
- return dataset
+ return dataset_api_repr
class BigQueryGetDatasetTablesOperator(GoogleCloudBaseOperator):
@@ -2272,7 +2292,7 @@ class
BigQueryUpdateDatasetOperator(GoogleCloudBaseOperator):
dataset_id=dataset_api_repr["datasetReference"]["datasetId"],
project_id=dataset_api_repr["datasetReference"]["projectId"],
)
- return dataset
+ return dataset_api_repr
class BigQueryDeleteTableOperator(GoogleCloudBaseOperator):
@@ -2688,7 +2708,8 @@ class BigQueryInsertJobOperator(GoogleCloudBaseOperator):
QueryJob._JOB_TYPE: ["destinationTable"],
}
- if self.project_id:
+ project_id = self.project_id or hook.project_id
+ if project_id:
for job_type, tables_prop in job_types.items():
job_configuration = job.to_api_repr()["configuration"]
if job_type in job_configuration:
@@ -2698,13 +2719,13 @@ class
BigQueryInsertJobOperator(GoogleCloudBaseOperator):
persist_kwargs = {
"context": context,
"task_instance": self,
- "project_id": self.project_id,
+ "project_id": project_id,
"table_id": table,
}
if not isinstance(table, str):
persist_kwargs["table_id"] = table["tableId"]
persist_kwargs["dataset_id"] =
table["datasetId"]
-
+ persist_kwargs["project_id"] =
table["projectId"]
BigQueryTableLink.persist(**persist_kwargs)
self.job_id = job.job_id