This is an automated email from the ASF dual-hosted git repository.

pankaj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 774125ae25 fix: BigQuery job error message (#34208)
774125ae25 is described below

commit 774125ae253611627229509e672518ce0a58cf2e
Author: Nathan Hadfield <[email protected]>
AuthorDate: Sat Sep 9 07:56:26 2023 +0100

    fix: BigQuery job error message (#34208)
    
    * Modified output to return a dict also containing a job message
    
    * Modified async functions to handle the dict response from `get_job_status`
    
    * Updated expected states to reflect dict output
    
    * Updated tests to check dictionary output
---
 airflow/providers/google/cloud/hooks/bigquery.py   |  8 +--
 .../providers/google/cloud/triggers/bigquery.py    | 74 +++++++++++++---------
 .../providers/google/cloud/hooks/test_bigquery.py  |  9 ++-
 .../google/cloud/triggers/test_bigquery.py         | 70 ++++++++++++--------
 4 files changed, 99 insertions(+), 62 deletions(-)

diff --git a/airflow/providers/google/cloud/hooks/bigquery.py 
b/airflow/providers/google/cloud/hooks/bigquery.py
index 18ad159a98..c98e7e92ab 100644
--- a/airflow/providers/google/cloud/hooks/bigquery.py
+++ b/airflow/providers/google/cloud/hooks/bigquery.py
@@ -3099,16 +3099,16 @@ class BigQueryAsyncHook(GoogleBaseAsyncHook):
         with await self.service_file_as_context() as f:
             return Job(job_id=job_id, project=project_id, service_file=f, 
session=cast(Session, session))
 
-    async def get_job_status(self, job_id: str | None, project_id: str | None 
= None) -> str:
+    async def get_job_status(self, job_id: str | None, project_id: str | None 
= None) -> dict[str, str]:
         async with ClientSession() as s:
             job_client = await self.get_job_instance(project_id, job_id, s)
             job = await job_client.get_job()
             status = job.get("status", {})
             if status["state"] == "DONE":
                 if "errorResult" in status:
-                    return "error"
-                return "success"
-            return status["state"].lower()
+                    return {"status": "error", "message": 
status["errorResult"]["message"]}
+                return {"status": "success", "message": "Job completed"}
+            return {"status": status["state"].lower(), "message": "Job 
running"}
 
     async def get_job_output(
         self,
diff --git a/airflow/providers/google/cloud/triggers/bigquery.py 
b/airflow/providers/google/cloud/triggers/bigquery.py
index 1f80479d8b..5bd9cc8385 100644
--- a/airflow/providers/google/cloud/triggers/bigquery.py
+++ b/airflow/providers/google/cloud/triggers/bigquery.py
@@ -72,27 +72,28 @@ class BigQueryInsertJobTrigger(BaseTrigger):
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:  # type: 
ignore[override]
-        """Gets current job execution status and yields a TriggerEvent."""
         """Gets current job execution status and yields a TriggerEvent."""
         hook = self._get_async_hook()
         try:
             while True:
                 job_status = await hook.get_job_status(job_id=self.job_id, 
project_id=self.project_id)
-                if job_status == "success":
+                if job_status["status"] == "success":
                     yield TriggerEvent(
                         {
                             "job_id": self.job_id,
-                            "status": job_status,
-                            "message": "Job completed",
+                            "status": job_status["status"],
+                            "message": job_status["message"],
                         }
                     )
                     return
-                elif job_status == "error":
-                    yield TriggerEvent({"status": "error"})
+                elif job_status["status"] == "error":
+                    yield TriggerEvent(job_status)
                     return
                 else:
                     self.log.info(
-                        "Bigquery job status is %s. Sleeping for %s seconds.", 
job_status, self.poll_interval
+                        "Bigquery job status is %s. Sleeping for %s seconds.",
+                        job_status["status"],
+                        self.poll_interval,
                     )
                     await asyncio.sleep(self.poll_interval)
         except Exception as e:
@@ -127,16 +128,16 @@ class BigQueryCheckTrigger(BigQueryInsertJobTrigger):
             while True:
                 # Poll for job execution status
                 job_status = await hook.get_job_status(job_id=self.job_id, 
project_id=self.project_id)
-                if job_status == "success":
+                if job_status["status"] == "success":
                     query_results = await 
hook.get_job_output(job_id=self.job_id, project_id=self.project_id)
-
                     records = hook.get_records(query_results)
 
                     # If empty list, then no records are available
                     if not records:
                         yield TriggerEvent(
                             {
-                                "status": "success",
+                                "status": job_status["status"],
+                                "message": job_status["message"],
                                 "records": None,
                             }
                         )
@@ -146,17 +147,20 @@ class BigQueryCheckTrigger(BigQueryInsertJobTrigger):
                         first_record = records.pop(0)
                         yield TriggerEvent(
                             {
-                                "status": "success",
+                                "status": job_status["status"],
+                                "message": job_status["message"],
                                 "records": first_record,
                             }
                         )
                         return
-                elif job_status == "error":
-                    yield TriggerEvent({"status": "error", "message": 
job_status})
+                elif job_status["status"] == "error":
+                    yield TriggerEvent({"status": "error", "message": 
job_status["message"]})
                     return
                 else:
                     self.log.info(
-                        "Bigquery job status is %s. Sleeping for %s seconds.", 
job_status, self.poll_interval
+                        "Bigquery job status is %s. Sleeping for %s seconds.",
+                        job_status["status"],
+                        self.poll_interval,
                     )
                     await asyncio.sleep(self.poll_interval)
         except Exception as e:
@@ -198,24 +202,26 @@ class BigQueryGetDataTrigger(BigQueryInsertJobTrigger):
             while True:
                 # Poll for job execution status
                 job_status = await hook.get_job_status(job_id=self.job_id, 
project_id=self.project_id)
-                if job_status == "success":
+                if job_status["status"] == "success":
                     query_results = await 
hook.get_job_output(job_id=self.job_id, project_id=self.project_id)
                     records = hook.get_records(query_results=query_results, 
as_dict=self.as_dict)
-                    self.log.debug("Response from hook: %s", job_status)
+                    self.log.debug("Response from hook: %s", 
job_status["status"])
                     yield TriggerEvent(
                         {
                             "status": "success",
-                            "message": job_status,
+                            "message": job_status["message"],
                             "records": records,
                         }
                     )
                     return
-                elif job_status == "error":
-                    yield TriggerEvent({"status": "error"})
+                elif job_status["status"] == "error":
+                    yield TriggerEvent(job_status)
                     return
                 else:
                     self.log.info(
-                        "Bigquery job status is %s. Sleeping for %s seconds.", 
job_status, self.poll_interval
+                        "Bigquery job status is %s. Sleeping for %s seconds.",
+                        job_status["status"],
+                        self.poll_interval,
                     )
                     await asyncio.sleep(self.poll_interval)
         except Exception as e:
@@ -308,7 +314,10 @@ class 
BigQueryIntervalCheckTrigger(BigQueryInsertJobTrigger):
                     job_id=self.second_job_id, project_id=self.project_id
                 )
 
-                if first_job_response_from_hook == "success" and 
second_job_response_from_hook == "success":
+                if (
+                    first_job_response_from_hook["status"] == "success"
+                    and second_job_response_from_hook["status"] == "success"
+                ):
                     first_query_results = await hook.get_job_output(
                         job_id=self.first_job_id, project_id=self.project_id
                     )
@@ -352,13 +361,16 @@ class 
BigQueryIntervalCheckTrigger(BigQueryInsertJobTrigger):
                         }
                     )
                     return
-                elif first_job_response_from_hook == "pending" or 
second_job_response_from_hook == "pending":
+                elif (
+                    first_job_response_from_hook["status"] == "pending"
+                    or second_job_response_from_hook["status"] == "pending"
+                ):
                     self.log.info("Query is still running...")
                     self.log.info("Sleeping for %s seconds.", 
self.poll_interval)
                     await asyncio.sleep(self.poll_interval)
                 else:
                     yield TriggerEvent(
-                        {"status": "error", "message": 
second_job_response_from_hook, "data": None}
+                        {"status": "error", "message": 
second_job_response_from_hook["message"], "data": None}
                     )
                     return
 
@@ -430,19 +442,21 @@ class BigQueryValueCheckTrigger(BigQueryInsertJobTrigger):
             while True:
                 # Poll for job execution status
                 response_from_hook = await 
hook.get_job_status(job_id=self.job_id, project_id=self.project_id)
-                if response_from_hook == "success":
+                if response_from_hook["status"] == "success":
                     query_results = await 
hook.get_job_output(job_id=self.job_id, project_id=self.project_id)
                     records = hook.get_records(query_results)
                     records = records.pop(0) if records else None
                     hook.value_check(self.sql, self.pass_value, records, 
self.tolerance)
                     yield TriggerEvent({"status": "success", "message": "Job 
completed", "records": records})
                     return
-                elif response_from_hook == "pending":
+                elif response_from_hook["status"] == "pending":
                     self.log.info("Query is still running...")
                     self.log.info("Sleeping for %s seconds.", 
self.poll_interval)
                     await asyncio.sleep(self.poll_interval)
                 else:
-                    yield TriggerEvent({"status": "error", "message": 
response_from_hook, "records": None})
+                    yield TriggerEvent(
+                        {"status": "error", "message": 
response_from_hook["message"], "records": None}
+                    )
                     return
         except Exception as e:
             self.log.exception("Exception occurred while checking for query 
completion")
@@ -574,8 +588,8 @@ class 
BigQueryTablePartitionExistenceTrigger(BigQueryTableExistenceTrigger):
         job_id = None
         while True:
             if job_id is not None:
-                status = await hook.get_job_status(job_id=job_id, 
project_id=self.project_id)
-                if status == "success":
+                job_status = await hook.get_job_status(job_id=job_id, 
project_id=self.project_id)
+                if job_status["status"] == "success":
                     is_partition = await self._partition_exists(
                         hook=hook, job_id=job_id, project_id=self.project_id
                     )
@@ -588,8 +602,8 @@ class 
BigQueryTablePartitionExistenceTrigger(BigQueryTableExistenceTrigger):
                         )
                         return
                     job_id = None
-                elif status == "error":
-                    yield TriggerEvent({"status": "error", "message": status})
+                elif job_status["status"] == "error":
+                    yield TriggerEvent(job_status)
                     return
                 self.log.info("Sleeping for %s seconds.", self.poll_interval)
                 await asyncio.sleep(self.poll_interval)
diff --git a/tests/providers/google/cloud/hooks/test_bigquery.py 
b/tests/providers/google/cloud/hooks/test_bigquery.py
index 7f461c9272..9791806436 100644
--- a/tests/providers/google/cloud/hooks/test_bigquery.py
+++ b/tests/providers/google/cloud/hooks/test_bigquery.py
@@ -2142,9 +2142,12 @@ class 
TestBigQueryAsyncHookMethods(_BigQueryBaseAsyncTestClass):
     @pytest.mark.parametrize(
         "job_status, expected",
         [
-            ({"status": {"state": "DONE"}}, "success"),
-            ({"status": {"state": "DONE", "errorResult": "Timeout"}}, "error"),
-            ({"status": {"state": "running"}}, "running"),
+            ({"status": {"state": "DONE"}}, {"status": "success", "message": 
"Job completed"}),
+            (
+                {"status": {"state": "DONE", "errorResult": {"message": 
"Timeout"}}},
+                {"status": "error", "message": "Timeout"},
+            ),
+            ({"status": {"state": "running"}}, {"status": "running", 
"message": "Job running"}),
         ],
     )
     @pytest.mark.asyncio
diff --git a/tests/providers/google/cloud/triggers/test_bigquery.py 
b/tests/providers/google/cloud/triggers/test_bigquery.py
index 25b375b3d6..81309161b0 100644
--- a/tests/providers/google/cloud/triggers/test_bigquery.py
+++ b/tests/providers/google/cloud/triggers/test_bigquery.py
@@ -165,7 +165,7 @@ class TestBigQueryInsertJobTrigger:
         """
         Tests the BigQueryInsertJobTrigger only fires once the query execution 
reaches a successful state.
         """
-        mock_job_status.return_value = "success"
+        mock_job_status.return_value = {"status": "success", "message": "Job 
completed"}
 
         generator = insert_job_trigger.run()
         actual = await generator.asend(None)
@@ -198,13 +198,16 @@ class TestBigQueryInsertJobTrigger:
     
@mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_status")
     async def test_bigquery_op_trigger_terminated(self, mock_job_status, 
caplog, insert_job_trigger):
         """Test that BigQuery Triggers fire the correct event in case of an 
error."""
-        # Set the status to a value other than success or pending
-
-        mock_job_status.return_value = "error"
+        mock_job_status.return_value = {
+            "status": "error",
+            "message": "The conn_id `bq_default` isn't defined",
+        }
 
         generator = insert_job_trigger.run()
         actual = await generator.asend(None)
-        assert TriggerEvent({"status": "error"}) == actual
+        assert (
+            TriggerEvent({"status": "error", "message": "The conn_id 
`bq_default` isn't defined"}) == actual
+        )
 
     @pytest.mark.asyncio
     
@mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_status")
@@ -240,7 +243,7 @@ class TestBigQueryGetDataTrigger:
 
         mock_job_client = AsyncMock(Job)
         mock_job_instance.return_value = mock_job_client
-        mock_job_instance.return_value.get_job.return_value = {"status": 
{"state": "RUNNING"}}
+        mock_job_instance.return_value.get_job.return_value = {"status": 
{"state": "running"}}
         caplog.set_level(logging.INFO)
 
         task = asyncio.create_task(get_data_trigger.run().__anext__())
@@ -260,11 +263,16 @@ class TestBigQueryGetDataTrigger:
         """Test that BigQuery Triggers fire the correct event in case of an 
error."""
         # Set the status to a value other than success or pending
 
-        mock_job_status.return_value = "error"
+        mock_job_status.return_value = {
+            "status": "error",
+            "message": "The conn_id `bq_default` isn't defined",
+        }
 
         generator = get_data_trigger.run()
         actual = await generator.asend(None)
-        assert TriggerEvent({"status": "error"}) == actual
+        assert (
+            TriggerEvent({"status": "error", "message": "The conn_id 
`bq_default` isn't defined"}) == actual
+        )
 
     @pytest.mark.asyncio
     
@mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_status")
@@ -285,7 +293,7 @@ class TestBigQueryGetDataTrigger:
         """
         Tests that BigQueryGetDataTrigger only fires once the query execution 
reaches a successful state.
         """
-        mock_job_status.return_value = "success"
+        mock_job_status.return_value = {"status": "success", "message": "Job 
completed"}
         mock_job_output.return_value = {
             "kind": "bigquery#tableDataList",
             "etag": "test_etag",
@@ -316,7 +324,7 @@ class TestBigQueryGetDataTrigger:
             TriggerEvent(
                 {
                     "status": "success",
-                    "message": "success",
+                    "message": "Job completed",
                     "records": [[42, "monthy python"], [42, "fishy fish"]],
                 }
             )
@@ -353,11 +361,16 @@ class TestBigQueryCheckTrigger:
         """Test that BigQuery Triggers fire the correct event in case of an 
error."""
         # Set the status to a value other than success or pending
 
-        mock_job_status.return_value = "error"
+        mock_job_status.return_value = {
+            "status": "error",
+            "message": "The conn_id `bq_default` isn't defined",
+        }
 
         generator = check_trigger.run()
         actual = await generator.asend(None)
-        assert TriggerEvent({"status": "error", "message": "error"}) == actual
+        assert (
+            TriggerEvent({"status": "error", "message": "The conn_id 
`bq_default` isn't defined"}) == actual
+        )
 
     @pytest.mark.asyncio
     
@mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_status")
@@ -390,7 +403,7 @@ class TestBigQueryCheckTrigger:
         """
         Test the BigQueryCheckTrigger only fires once the query execution 
reaches a successful state.
         """
-        mock_job_status.return_value = "success"
+        mock_job_status.return_value = {"status": "success", "message": "Job 
completed"}
         mock_job_output.return_value = {
             "kind": "bigquery#getQueryResultsResponse",
             "etag": "test_etag",
@@ -410,17 +423,16 @@ class TestBigQueryCheckTrigger:
         generator = check_trigger.run()
         actual = await generator.asend(None)
 
-        assert TriggerEvent({"status": "success", "records": [22]}) == actual
+        assert TriggerEvent({"status": "success", "message": "Job completed", 
"records": [22]}) == actual
 
     @pytest.mark.asyncio
     
@mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_status")
     
@mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_output")
     async def test_check_trigger_success_without_data(self, mock_job_output, 
mock_job_status, check_trigger):
         """
-        Tests that BigQueryCheckTrigger sends TriggerEvent as  { "status": 
"success", "records": None}
-        when no rows are available in the query result.
+        Tests that BigQueryCheckTrigger sends TriggerEvent when no rows are 
available in the query result.
         """
-        mock_job_status.return_value = "success"
+        mock_job_status.return_value = {"status": "success", "message": "Job 
completed"}
         mock_job_output.return_value = {
             "kind": "bigquery#getQueryResultsResponse",
             "etag": "test_etag",
@@ -444,7 +456,7 @@ class TestBigQueryCheckTrigger:
 
         generator = check_trigger.run()
         actual = await generator.asend(None)
-        assert TriggerEvent({"status": "success", "records": None}) == actual
+        assert TriggerEvent({"status": "success", "message": "Job completed", 
"records": None}) == actual
 
 
 class TestBigQueryIntervalCheckTrigger:
@@ -477,7 +489,7 @@ class TestBigQueryIntervalCheckTrigger:
         """
         Tests the BigQueryIntervalCheckTrigger only fires once the query 
execution reaches a successful state.
         """
-        mock_job_status.return_value = "success"
+        mock_job_status.return_value = {"status": "success", "message": "Job 
completed"}
         mock_get_job_output.return_value = ["0"]
 
         generator = interval_check_trigger.run()
@@ -490,7 +502,7 @@ class TestBigQueryIntervalCheckTrigger:
         """
         Tests that the BigQueryIntervalCheckTrigger do not fire while a query 
is still running.
         """
-        mock_job_status.return_value = "pending"
+        mock_job_status.return_value = {"status": "pending", "message": "Job 
pending"}
         caplog.set_level(logging.INFO)
 
         task = asyncio.create_task(interval_check_trigger.run().__anext__())
@@ -510,12 +522,20 @@ class TestBigQueryIntervalCheckTrigger:
     async def test_interval_check_trigger_terminated(self, mock_job_status, 
interval_check_trigger):
         """Tests the BigQueryIntervalCheckTrigger fires the correct event in 
case of an error."""
         # Set the status to a value other than success or pending
-        mock_job_status.return_value = "error"
+        mock_job_status.return_value = {
+            "status": "error",
+            "message": "The conn_id `bq_default` isn't defined",
+        }
 
         generator = interval_check_trigger.run()
         actual = await generator.asend(None)
 
-        assert TriggerEvent({"status": "error", "message": "error", "data": 
None}) == actual
+        assert (
+            TriggerEvent(
+                {"status": "error", "message": "The conn_id `bq_default` isn't 
defined", "data": None}
+            )
+            == actual
+        )
 
     @pytest.mark.asyncio
     
@mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_status")
@@ -559,7 +579,7 @@ class TestBigQueryValueCheckTrigger:
         """
         Tests BigQueryValueCheckTrigger only fires once the query execution 
reaches a successful state.
         """
-        mock_job_status.return_value = "success"
+        mock_job_status.return_value = {"status": "success", "message": "Job 
completed"}
         get_job_output.return_value = {}
         get_records.return_value = [[2], [4]]
 
@@ -576,7 +596,7 @@ class TestBigQueryValueCheckTrigger:
         """
         Tests BigQueryValueCheckTrigger only fires once the query execution 
reaches a successful state.
         """
-        mock_job_status.return_value = "pending"
+        mock_job_status.return_value = {"status": "pending", "message": "Job 
pending"}
         caplog.set_level(logging.INFO)
 
         task = asyncio.create_task(value_check_trigger.run().__anext__())
@@ -598,7 +618,7 @@ class TestBigQueryValueCheckTrigger:
         """
         Tests BigQueryValueCheckTrigger only fires once the query execution 
reaches a successful state.
         """
-        mock_job_status.return_value = "dummy"
+        mock_job_status.return_value = {"status": "error", "message": "dummy"}
 
         generator = value_check_trigger.run()
         actual = await generator.asend(None)

Reply via email to