This is an automated email from the ASF dual-hosted git repository.
pankaj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 774125ae25 fix: BigQuery job error message (#34208)
774125ae25 is described below
commit 774125ae253611627229509e672518ce0a58cf2e
Author: Nathan Hadfield <[email protected]>
AuthorDate: Sat Sep 9 07:56:26 2023 +0100
fix: BigQuery job error message (#34208)
* Modified output to return a dict also containing a job message
* Modified async functions to handle the dict response from `get_job_status`
* Updated expected states to reflect dict output
* Updated tests to check dictionary output
---
airflow/providers/google/cloud/hooks/bigquery.py | 8 +--
.../providers/google/cloud/triggers/bigquery.py | 74 +++++++++++++---------
.../providers/google/cloud/hooks/test_bigquery.py | 9 ++-
.../google/cloud/triggers/test_bigquery.py | 70 ++++++++++++--------
4 files changed, 99 insertions(+), 62 deletions(-)
diff --git a/airflow/providers/google/cloud/hooks/bigquery.py
b/airflow/providers/google/cloud/hooks/bigquery.py
index 18ad159a98..c98e7e92ab 100644
--- a/airflow/providers/google/cloud/hooks/bigquery.py
+++ b/airflow/providers/google/cloud/hooks/bigquery.py
@@ -3099,16 +3099,16 @@ class BigQueryAsyncHook(GoogleBaseAsyncHook):
with await self.service_file_as_context() as f:
return Job(job_id=job_id, project=project_id, service_file=f,
session=cast(Session, session))
- async def get_job_status(self, job_id: str | None, project_id: str | None
= None) -> str:
+ async def get_job_status(self, job_id: str | None, project_id: str | None
= None) -> dict[str, str]:
async with ClientSession() as s:
job_client = await self.get_job_instance(project_id, job_id, s)
job = await job_client.get_job()
status = job.get("status", {})
if status["state"] == "DONE":
if "errorResult" in status:
- return "error"
- return "success"
- return status["state"].lower()
+ return {"status": "error", "message":
status["errorResult"]["message"]}
+ return {"status": "success", "message": "Job completed"}
+ return {"status": status["state"].lower(), "message": "Job
running"}
async def get_job_output(
self,
diff --git a/airflow/providers/google/cloud/triggers/bigquery.py
b/airflow/providers/google/cloud/triggers/bigquery.py
index 1f80479d8b..5bd9cc8385 100644
--- a/airflow/providers/google/cloud/triggers/bigquery.py
+++ b/airflow/providers/google/cloud/triggers/bigquery.py
@@ -72,27 +72,28 @@ class BigQueryInsertJobTrigger(BaseTrigger):
)
async def run(self) -> AsyncIterator[TriggerEvent]: # type:
ignore[override]
- """Gets current job execution status and yields a TriggerEvent."""
"""Gets current job execution status and yields a TriggerEvent."""
hook = self._get_async_hook()
try:
while True:
job_status = await hook.get_job_status(job_id=self.job_id,
project_id=self.project_id)
- if job_status == "success":
+ if job_status["status"] == "success":
yield TriggerEvent(
{
"job_id": self.job_id,
- "status": job_status,
- "message": "Job completed",
+ "status": job_status["status"],
+ "message": job_status["message"],
}
)
return
- elif job_status == "error":
- yield TriggerEvent({"status": "error"})
+ elif job_status["status"] == "error":
+ yield TriggerEvent(job_status)
return
else:
self.log.info(
- "Bigquery job status is %s. Sleeping for %s seconds.",
job_status, self.poll_interval
+ "Bigquery job status is %s. Sleeping for %s seconds.",
+ job_status["status"],
+ self.poll_interval,
)
await asyncio.sleep(self.poll_interval)
except Exception as e:
@@ -127,16 +128,16 @@ class BigQueryCheckTrigger(BigQueryInsertJobTrigger):
while True:
# Poll for job execution status
job_status = await hook.get_job_status(job_id=self.job_id,
project_id=self.project_id)
- if job_status == "success":
+ if job_status["status"] == "success":
query_results = await
hook.get_job_output(job_id=self.job_id, project_id=self.project_id)
-
records = hook.get_records(query_results)
# If empty list, then no records are available
if not records:
yield TriggerEvent(
{
- "status": "success",
+ "status": job_status["status"],
+ "message": job_status["message"],
"records": None,
}
)
@@ -146,17 +147,20 @@ class BigQueryCheckTrigger(BigQueryInsertJobTrigger):
first_record = records.pop(0)
yield TriggerEvent(
{
- "status": "success",
+ "status": job_status["status"],
+ "message": job_status["message"],
"records": first_record,
}
)
return
- elif job_status == "error":
- yield TriggerEvent({"status": "error", "message":
job_status})
+ elif job_status["status"] == "error":
+ yield TriggerEvent({"status": "error", "message":
job_status["message"]})
return
else:
self.log.info(
- "Bigquery job status is %s. Sleeping for %s seconds.",
job_status, self.poll_interval
+ "Bigquery job status is %s. Sleeping for %s seconds.",
+ job_status["status"],
+ self.poll_interval,
)
await asyncio.sleep(self.poll_interval)
except Exception as e:
@@ -198,24 +202,26 @@ class BigQueryGetDataTrigger(BigQueryInsertJobTrigger):
while True:
# Poll for job execution status
job_status = await hook.get_job_status(job_id=self.job_id,
project_id=self.project_id)
- if job_status == "success":
+ if job_status["status"] == "success":
query_results = await
hook.get_job_output(job_id=self.job_id, project_id=self.project_id)
records = hook.get_records(query_results=query_results,
as_dict=self.as_dict)
- self.log.debug("Response from hook: %s", job_status)
+ self.log.debug("Response from hook: %s",
job_status["status"])
yield TriggerEvent(
{
"status": "success",
- "message": job_status,
+ "message": job_status["message"],
"records": records,
}
)
return
- elif job_status == "error":
- yield TriggerEvent({"status": "error"})
+ elif job_status["status"] == "error":
+ yield TriggerEvent(job_status)
return
else:
self.log.info(
- "Bigquery job status is %s. Sleeping for %s seconds.",
job_status, self.poll_interval
+ "Bigquery job status is %s. Sleeping for %s seconds.",
+ job_status["status"],
+ self.poll_interval,
)
await asyncio.sleep(self.poll_interval)
except Exception as e:
@@ -308,7 +314,10 @@ class
BigQueryIntervalCheckTrigger(BigQueryInsertJobTrigger):
job_id=self.second_job_id, project_id=self.project_id
)
- if first_job_response_from_hook == "success" and
second_job_response_from_hook == "success":
+ if (
+ first_job_response_from_hook["status"] == "success"
+ and second_job_response_from_hook["status"] == "success"
+ ):
first_query_results = await hook.get_job_output(
job_id=self.first_job_id, project_id=self.project_id
)
@@ -352,13 +361,16 @@ class
BigQueryIntervalCheckTrigger(BigQueryInsertJobTrigger):
}
)
return
- elif first_job_response_from_hook == "pending" or
second_job_response_from_hook == "pending":
+ elif (
+ first_job_response_from_hook["status"] == "pending"
+ or second_job_response_from_hook["status"] == "pending"
+ ):
self.log.info("Query is still running...")
self.log.info("Sleeping for %s seconds.",
self.poll_interval)
await asyncio.sleep(self.poll_interval)
else:
yield TriggerEvent(
- {"status": "error", "message":
second_job_response_from_hook, "data": None}
+ {"status": "error", "message":
second_job_response_from_hook["message"], "data": None}
)
return
@@ -430,19 +442,21 @@ class BigQueryValueCheckTrigger(BigQueryInsertJobTrigger):
while True:
# Poll for job execution status
response_from_hook = await
hook.get_job_status(job_id=self.job_id, project_id=self.project_id)
- if response_from_hook == "success":
+ if response_from_hook["status"] == "success":
query_results = await
hook.get_job_output(job_id=self.job_id, project_id=self.project_id)
records = hook.get_records(query_results)
records = records.pop(0) if records else None
hook.value_check(self.sql, self.pass_value, records,
self.tolerance)
yield TriggerEvent({"status": "success", "message": "Job
completed", "records": records})
return
- elif response_from_hook == "pending":
+ elif response_from_hook["status"] == "pending":
self.log.info("Query is still running...")
self.log.info("Sleeping for %s seconds.",
self.poll_interval)
await asyncio.sleep(self.poll_interval)
else:
- yield TriggerEvent({"status": "error", "message":
response_from_hook, "records": None})
+ yield TriggerEvent(
+ {"status": "error", "message":
response_from_hook["message"], "records": None}
+ )
return
except Exception as e:
self.log.exception("Exception occurred while checking for query
completion")
@@ -574,8 +588,8 @@ class
BigQueryTablePartitionExistenceTrigger(BigQueryTableExistenceTrigger):
job_id = None
while True:
if job_id is not None:
- status = await hook.get_job_status(job_id=job_id,
project_id=self.project_id)
- if status == "success":
+ job_status = await hook.get_job_status(job_id=job_id,
project_id=self.project_id)
+ if job_status["status"] == "success":
is_partition = await self._partition_exists(
hook=hook, job_id=job_id, project_id=self.project_id
)
@@ -588,8 +602,8 @@ class
BigQueryTablePartitionExistenceTrigger(BigQueryTableExistenceTrigger):
)
return
job_id = None
- elif status == "error":
- yield TriggerEvent({"status": "error", "message": status})
+ elif job_status["status"] == "error":
+ yield TriggerEvent(job_status)
return
self.log.info("Sleeping for %s seconds.", self.poll_interval)
await asyncio.sleep(self.poll_interval)
diff --git a/tests/providers/google/cloud/hooks/test_bigquery.py
b/tests/providers/google/cloud/hooks/test_bigquery.py
index 7f461c9272..9791806436 100644
--- a/tests/providers/google/cloud/hooks/test_bigquery.py
+++ b/tests/providers/google/cloud/hooks/test_bigquery.py
@@ -2142,9 +2142,12 @@ class
TestBigQueryAsyncHookMethods(_BigQueryBaseAsyncTestClass):
@pytest.mark.parametrize(
"job_status, expected",
[
- ({"status": {"state": "DONE"}}, "success"),
- ({"status": {"state": "DONE", "errorResult": "Timeout"}}, "error"),
- ({"status": {"state": "running"}}, "running"),
+ ({"status": {"state": "DONE"}}, {"status": "success", "message":
"Job completed"}),
+ (
+ {"status": {"state": "DONE", "errorResult": {"message":
"Timeout"}}},
+ {"status": "error", "message": "Timeout"},
+ ),
+ ({"status": {"state": "running"}}, {"status": "running",
"message": "Job running"}),
],
)
@pytest.mark.asyncio
diff --git a/tests/providers/google/cloud/triggers/test_bigquery.py
b/tests/providers/google/cloud/triggers/test_bigquery.py
index 25b375b3d6..81309161b0 100644
--- a/tests/providers/google/cloud/triggers/test_bigquery.py
+++ b/tests/providers/google/cloud/triggers/test_bigquery.py
@@ -165,7 +165,7 @@ class TestBigQueryInsertJobTrigger:
"""
Tests the BigQueryInsertJobTrigger only fires once the query execution
reaches a successful state.
"""
- mock_job_status.return_value = "success"
+ mock_job_status.return_value = {"status": "success", "message": "Job
completed"}
generator = insert_job_trigger.run()
actual = await generator.asend(None)
@@ -198,13 +198,16 @@ class TestBigQueryInsertJobTrigger:
@mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_status")
async def test_bigquery_op_trigger_terminated(self, mock_job_status,
caplog, insert_job_trigger):
"""Test that BigQuery Triggers fire the correct event in case of an
error."""
- # Set the status to a value other than success or pending
-
- mock_job_status.return_value = "error"
+ mock_job_status.return_value = {
+ "status": "error",
+ "message": "The conn_id `bq_default` isn't defined",
+ }
generator = insert_job_trigger.run()
actual = await generator.asend(None)
- assert TriggerEvent({"status": "error"}) == actual
+ assert (
+ TriggerEvent({"status": "error", "message": "The conn_id
`bq_default` isn't defined"}) == actual
+ )
@pytest.mark.asyncio
@mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_status")
@@ -240,7 +243,7 @@ class TestBigQueryGetDataTrigger:
mock_job_client = AsyncMock(Job)
mock_job_instance.return_value = mock_job_client
- mock_job_instance.return_value.get_job.return_value = {"status":
{"state": "RUNNING"}}
+ mock_job_instance.return_value.get_job.return_value = {"status":
{"state": "running"}}
caplog.set_level(logging.INFO)
task = asyncio.create_task(get_data_trigger.run().__anext__())
@@ -260,11 +263,16 @@ class TestBigQueryGetDataTrigger:
"""Test that BigQuery Triggers fire the correct event in case of an
error."""
# Set the status to a value other than success or pending
- mock_job_status.return_value = "error"
+ mock_job_status.return_value = {
+ "status": "error",
+ "message": "The conn_id `bq_default` isn't defined",
+ }
generator = get_data_trigger.run()
actual = await generator.asend(None)
- assert TriggerEvent({"status": "error"}) == actual
+ assert (
+ TriggerEvent({"status": "error", "message": "The conn_id
`bq_default` isn't defined"}) == actual
+ )
@pytest.mark.asyncio
@mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_status")
@@ -285,7 +293,7 @@ class TestBigQueryGetDataTrigger:
"""
Tests that BigQueryGetDataTrigger only fires once the query execution
reaches a successful state.
"""
- mock_job_status.return_value = "success"
+ mock_job_status.return_value = {"status": "success", "message": "Job
completed"}
mock_job_output.return_value = {
"kind": "bigquery#tableDataList",
"etag": "test_etag",
@@ -316,7 +324,7 @@ class TestBigQueryGetDataTrigger:
TriggerEvent(
{
"status": "success",
- "message": "success",
+ "message": "Job completed",
"records": [[42, "monthy python"], [42, "fishy fish"]],
}
)
@@ -353,11 +361,16 @@ class TestBigQueryCheckTrigger:
"""Test that BigQuery Triggers fire the correct event in case of an
error."""
# Set the status to a value other than success or pending
- mock_job_status.return_value = "error"
+ mock_job_status.return_value = {
+ "status": "error",
+ "message": "The conn_id `bq_default` isn't defined",
+ }
generator = check_trigger.run()
actual = await generator.asend(None)
- assert TriggerEvent({"status": "error", "message": "error"}) == actual
+ assert (
+ TriggerEvent({"status": "error", "message": "The conn_id
`bq_default` isn't defined"}) == actual
+ )
@pytest.mark.asyncio
@mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_status")
@@ -390,7 +403,7 @@ class TestBigQueryCheckTrigger:
"""
Test the BigQueryCheckTrigger only fires once the query execution
reaches a successful state.
"""
- mock_job_status.return_value = "success"
+ mock_job_status.return_value = {"status": "success", "message": "Job
completed"}
mock_job_output.return_value = {
"kind": "bigquery#getQueryResultsResponse",
"etag": "test_etag",
@@ -410,17 +423,16 @@ class TestBigQueryCheckTrigger:
generator = check_trigger.run()
actual = await generator.asend(None)
- assert TriggerEvent({"status": "success", "records": [22]}) == actual
+ assert TriggerEvent({"status": "success", "message": "Job completed",
"records": [22]}) == actual
@pytest.mark.asyncio
@mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_status")
@mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_output")
async def test_check_trigger_success_without_data(self, mock_job_output,
mock_job_status, check_trigger):
"""
- Tests that BigQueryCheckTrigger sends TriggerEvent as { "status":
"success", "records": None}
- when no rows are available in the query result.
+ Tests that BigQueryCheckTrigger sends TriggerEvent when no rows are
available in the query result.
"""
- mock_job_status.return_value = "success"
+ mock_job_status.return_value = {"status": "success", "message": "Job
completed"}
mock_job_output.return_value = {
"kind": "bigquery#getQueryResultsResponse",
"etag": "test_etag",
@@ -444,7 +456,7 @@ class TestBigQueryCheckTrigger:
generator = check_trigger.run()
actual = await generator.asend(None)
- assert TriggerEvent({"status": "success", "records": None}) == actual
+ assert TriggerEvent({"status": "success", "message": "Job completed",
"records": None}) == actual
class TestBigQueryIntervalCheckTrigger:
@@ -477,7 +489,7 @@ class TestBigQueryIntervalCheckTrigger:
"""
Tests the BigQueryIntervalCheckTrigger only fires once the query
execution reaches a successful state.
"""
- mock_job_status.return_value = "success"
+ mock_job_status.return_value = {"status": "success", "message": "Job
completed"}
mock_get_job_output.return_value = ["0"]
generator = interval_check_trigger.run()
@@ -490,7 +502,7 @@ class TestBigQueryIntervalCheckTrigger:
"""
Tests that the BigQueryIntervalCheckTrigger do not fire while a query
is still running.
"""
- mock_job_status.return_value = "pending"
+ mock_job_status.return_value = {"status": "pending", "message": "Job
pending"}
caplog.set_level(logging.INFO)
task = asyncio.create_task(interval_check_trigger.run().__anext__())
@@ -510,12 +522,20 @@ class TestBigQueryIntervalCheckTrigger:
async def test_interval_check_trigger_terminated(self, mock_job_status,
interval_check_trigger):
"""Tests the BigQueryIntervalCheckTrigger fires the correct event in
case of an error."""
# Set the status to a value other than success or pending
- mock_job_status.return_value = "error"
+ mock_job_status.return_value = {
+ "status": "error",
+ "message": "The conn_id `bq_default` isn't defined",
+ }
generator = interval_check_trigger.run()
actual = await generator.asend(None)
- assert TriggerEvent({"status": "error", "message": "error", "data":
None}) == actual
+ assert (
+ TriggerEvent(
+ {"status": "error", "message": "The conn_id `bq_default` isn't
defined", "data": None}
+ )
+ == actual
+ )
@pytest.mark.asyncio
@mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_status")
@@ -559,7 +579,7 @@ class TestBigQueryValueCheckTrigger:
"""
Tests BigQueryValueCheckTrigger only fires once the query execution
reaches a successful state.
"""
- mock_job_status.return_value = "success"
+ mock_job_status.return_value = {"status": "success", "message": "Job
completed"}
get_job_output.return_value = {}
get_records.return_value = [[2], [4]]
@@ -576,7 +596,7 @@ class TestBigQueryValueCheckTrigger:
"""
Tests BigQueryValueCheckTrigger only fires once the query execution
reaches a successful state.
"""
- mock_job_status.return_value = "pending"
+ mock_job_status.return_value = {"status": "pending", "message": "Job
pending"}
caplog.set_level(logging.INFO)
task = asyncio.create_task(value_check_trigger.run().__anext__())
@@ -598,7 +618,7 @@ class TestBigQueryValueCheckTrigger:
"""
Tests BigQueryValueCheckTrigger only fires once the query execution
reaches a successful state.
"""
- mock_job_status.return_value = "dummy"
+ mock_job_status.return_value = {"status": "error", "message": "dummy"}
generator = value_check_trigger.run()
actual = await generator.asend(None)