This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new db16eeb6ef fix BigQueryInsertJobOperator's return value and
openlineage extraction in deferrable mode (#40457)
db16eeb6ef is described below
commit db16eeb6efd6006efc94d2ec9f7740fe96edf37c
Author: Kacper Muda <[email protected]>
AuthorDate: Tue Jul 2 11:29:01 2024 +0200
fix BigQueryInsertJobOperator's return value and openlineage extraction in
deferrable mode (#40457)
Signed-off-by: Kacper Muda <[email protected]>
---
.../providers/google/cloud/openlineage/mixins.py | 10 +++++++++
.../providers/google/cloud/operators/bigquery.py | 2 ++
.../google/cloud/operators/test_bigquery.py | 26 ++++++++++++++++++++++
3 files changed, 38 insertions(+)
diff --git a/airflow/providers/google/cloud/openlineage/mixins.py
b/airflow/providers/google/cloud/openlineage/mixins.py
index 691cec5884..48ff695c72 100644
--- a/airflow/providers/google/cloud/openlineage/mixins.py
+++ b/airflow/providers/google/cloud/openlineage/mixins.py
@@ -67,8 +67,18 @@ class _BigQueryOpenLineageMixin:
from airflow.providers.openlineage.sqlparser import SQLParser
if not self.job_id:
+ if hasattr(self, "log"):
+ self.log.warning("No BigQuery job_id was found by
OpenLineage.")
return OperatorLineage()
+ if not self.hook:
+ from airflow.providers.google.cloud.hooks.bigquery import
BigQueryHook
+
+ self.hook = BigQueryHook(
+ gcp_conn_id=self.gcp_conn_id,
+ impersonation_chain=self.impersonation_chain,
+ )
+
run_facets: dict[str, BaseFacet] = {
"externalQuery":
ExternalQueryRunFacet(externalQueryId=self.job_id, source="bigquery")
}
diff --git a/airflow/providers/google/cloud/operators/bigquery.py
b/airflow/providers/google/cloud/operators/bigquery.py
index bed2c79ee5..d3f79c9bbd 100644
--- a/airflow/providers/google/cloud/operators/bigquery.py
+++ b/airflow/providers/google/cloud/operators/bigquery.py
@@ -3038,6 +3038,8 @@ class BigQueryInsertJobOperator(GoogleCloudBaseOperator,
_BigQueryOpenLineageMix
self.task_id,
event["message"],
)
+ # Save job_id as an attribute to be later used by listeners
+ self.job_id = event.get("job_id")
return self.job_id
def on_kill(self) -> None:
diff --git a/tests/providers/google/cloud/operators/test_bigquery.py
b/tests/providers/google/cloud/operators/test_bigquery.py
index 6f4e76baa7..febcfe4871 100644
--- a/tests/providers/google/cloud/operators/test_bigquery.py
+++ b/tests/providers/google/cloud/operators/test_bigquery.py
@@ -1691,6 +1691,32 @@ class TestBigQueryInsertJobOperator:
"%s completed with response %s ", "insert_query_job", "Job
completed"
)
+ def
test_bigquery_insert_job_operator_execute_complete_reassigns_job_id(self):
+ """Assert that we use job_id from event after deferral."""
+ configuration = {
+ "query": {
+ "query": "SELECT * FROM any",
+ "useLegacySql": False,
+ }
+ }
+ job_id = "123456"
+
+ operator = BigQueryInsertJobOperator(
+ task_id="insert_query_job",
+ configuration=configuration,
+ location=TEST_DATASET_LOCATION,
+ job_id=None, # We are not passing anything here on purpose
+ project_id=TEST_GCP_PROJECT_ID,
+ deferrable=True,
+ )
+
+ returned_job_id = operator.execute_complete(
+ context=MagicMock(),
+ event={"status": "success", "message": "Job completed", "job_id":
job_id},
+ )
+ assert returned_job_id == job_id
+ assert operator.job_id == job_id
+
@pytest.mark.db_test
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
def test_bigquery_insert_job_operator_with_job_id_generate(