This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new db16eeb6ef fix BigQueryInsertJobOperator's return value and 
openlineage extraction in deferrable mode (#40457)
db16eeb6ef is described below

commit db16eeb6efd6006efc94d2ec9f7740fe96edf37c
Author: Kacper Muda <[email protected]>
AuthorDate: Tue Jul 2 11:29:01 2024 +0200

    fix BigQueryInsertJobOperator's return value and openlineage extraction in 
deferrable mode (#40457)
    
    Signed-off-by: Kacper Muda <[email protected]>
---
 .../providers/google/cloud/openlineage/mixins.py   | 10 +++++++++
 .../providers/google/cloud/operators/bigquery.py   |  2 ++
 .../google/cloud/operators/test_bigquery.py        | 26 ++++++++++++++++++++++
 3 files changed, 38 insertions(+)

diff --git a/airflow/providers/google/cloud/openlineage/mixins.py 
b/airflow/providers/google/cloud/openlineage/mixins.py
index 691cec5884..48ff695c72 100644
--- a/airflow/providers/google/cloud/openlineage/mixins.py
+++ b/airflow/providers/google/cloud/openlineage/mixins.py
@@ -67,8 +67,18 @@ class _BigQueryOpenLineageMixin:
         from airflow.providers.openlineage.sqlparser import SQLParser
 
         if not self.job_id:
+            if hasattr(self, "log"):
+                self.log.warning("No BigQuery job_id was found by 
OpenLineage.")
             return OperatorLineage()
 
+        if not self.hook:
+            from airflow.providers.google.cloud.hooks.bigquery import 
BigQueryHook
+
+            self.hook = BigQueryHook(
+                gcp_conn_id=self.gcp_conn_id,
+                impersonation_chain=self.impersonation_chain,
+            )
+
         run_facets: dict[str, BaseFacet] = {
             "externalQuery": 
ExternalQueryRunFacet(externalQueryId=self.job_id, source="bigquery")
         }
diff --git a/airflow/providers/google/cloud/operators/bigquery.py 
b/airflow/providers/google/cloud/operators/bigquery.py
index bed2c79ee5..d3f79c9bbd 100644
--- a/airflow/providers/google/cloud/operators/bigquery.py
+++ b/airflow/providers/google/cloud/operators/bigquery.py
@@ -3038,6 +3038,8 @@ class BigQueryInsertJobOperator(GoogleCloudBaseOperator, 
_BigQueryOpenLineageMix
             self.task_id,
             event["message"],
         )
+        # Save job_id as an attribute to be later used by listeners
+        self.job_id = event.get("job_id")
         return self.job_id
 
     def on_kill(self) -> None:
diff --git a/tests/providers/google/cloud/operators/test_bigquery.py 
b/tests/providers/google/cloud/operators/test_bigquery.py
index 6f4e76baa7..febcfe4871 100644
--- a/tests/providers/google/cloud/operators/test_bigquery.py
+++ b/tests/providers/google/cloud/operators/test_bigquery.py
@@ -1691,6 +1691,32 @@ class TestBigQueryInsertJobOperator:
             "%s completed with response %s ", "insert_query_job", "Job 
completed"
         )
 
+    def 
test_bigquery_insert_job_operator_execute_complete_reassigns_job_id(self):
+        """Assert that we use job_id from event after deferral."""
+        configuration = {
+            "query": {
+                "query": "SELECT * FROM any",
+                "useLegacySql": False,
+            }
+        }
+        job_id = "123456"
+
+        operator = BigQueryInsertJobOperator(
+            task_id="insert_query_job",
+            configuration=configuration,
+            location=TEST_DATASET_LOCATION,
+            job_id=None,  # We are not passing anything here on purpose
+            project_id=TEST_GCP_PROJECT_ID,
+            deferrable=True,
+        )
+
+        returned_job_id = operator.execute_complete(
+            context=MagicMock(),
+            event={"status": "success", "message": "Job completed", "job_id": 
job_id},
+        )
+        assert returned_job_id == job_id
+        assert operator.job_id == job_id
+
     @pytest.mark.db_test
     
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
     def test_bigquery_insert_job_operator_with_job_id_generate(

Reply via email to