This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 81b85ebcbd Fix `BigQueryInsertJobOperator` not exiting deferred state
(#31591)
81b85ebcbd is described below
commit 81b85ebcbd241e1909793d7480aabc81777b225c
Author: Pankaj Singh <[email protected]>
AuthorDate: Sat Jul 29 13:03:56 2023 +0530
Fix `BigQueryInsertJobOperator` not exiting deferred state (#31591)
---
airflow/providers/google/cloud/hooks/bigquery.py | 31 ++++-------
.../providers/google/cloud/triggers/bigquery.py | 63 +++++++++++-----------
.../providers/google/cloud/hooks/test_bigquery.py | 33 ++++--------
.../google/cloud/triggers/test_bigquery.py | 20 +++----
4 files changed, 59 insertions(+), 88 deletions(-)
diff --git a/airflow/providers/google/cloud/hooks/bigquery.py
b/airflow/providers/google/cloud/hooks/bigquery.py
index 01c00046d1..12c084eba5 100644
--- a/airflow/providers/google/cloud/hooks/bigquery.py
+++ b/airflow/providers/google/cloud/hooks/bigquery.py
@@ -3103,29 +3103,16 @@ class BigQueryAsyncHook(GoogleBaseAsyncHook):
with await self.service_file_as_context() as f:
return Job(job_id=job_id, project=project_id, service_file=f,
session=cast(Session, session))
- async def get_job_status(
- self,
- job_id: str | None,
- project_id: str | None = None,
- ) -> str | None:
- """Poll for job status asynchronously using gcloud-aio.
-
- Note that an OSError is raised when Job results are still pending.
- Exception means that Job finished with errors
- """
+ async def get_job_status(self, job_id: str | None, project_id: str | None
= None) -> str:
async with ClientSession() as s:
- try:
- self.log.info("Executing get_job_status...")
- job_client = await self.get_job_instance(project_id, job_id, s)
- job_status_response = await job_client.result(cast(Session, s))
- if job_status_response:
- job_status = "success"
- except OSError:
- job_status = "pending"
- except Exception as e:
- self.log.info("Query execution finished with errors...")
- job_status = str(e)
- return job_status
+ job_client = await self.get_job_instance(project_id, job_id, s)
+ job = await job_client.get_job()
+ status = job.get("status", {})
+ if status["state"] == "DONE":
+ if "errorResult" in status:
+ return "error"
+ return "success"
+ return status["state"].lower()
async def get_job_output(
self,
diff --git a/airflow/providers/google/cloud/triggers/bigquery.py
b/airflow/providers/google/cloud/triggers/bigquery.py
index b1c659a845..edafddf16e 100644
--- a/airflow/providers/google/cloud/triggers/bigquery.py
+++ b/airflow/providers/google/cloud/triggers/bigquery.py
@@ -72,31 +72,29 @@ class BigQueryInsertJobTrigger(BaseTrigger):
)
async def run(self) -> AsyncIterator[TriggerEvent]: # type:
ignore[override]
+ """Gets current job execution status and yields a TriggerEvent."""
"""Gets current job execution status and yields a TriggerEvent."""
hook = self._get_async_hook()
while True:
try:
- # Poll for job execution status
- response_from_hook = await
hook.get_job_status(job_id=self.job_id, project_id=self.project_id)
- self.log.debug("Response from hook: %s", response_from_hook)
-
- if response_from_hook == "success":
+ job_status = await hook.get_job_status(job_id=self.job_id,
project_id=self.project_id)
+ if job_status == "success":
yield TriggerEvent(
{
"job_id": self.job_id,
- "status": "success",
+ "status": job_status,
"message": "Job completed",
}
)
return
- elif response_from_hook == "pending":
- self.log.info("Query is still running...")
- self.log.info("Sleeping for %s seconds.",
self.poll_interval)
- await asyncio.sleep(self.poll_interval)
- else:
- yield TriggerEvent({"status": "error", "message":
response_from_hook})
+ elif job_status == "error":
+ yield TriggerEvent({"status": "error"})
return
-
+ else:
+ self.log.info(
+ "Bigquery job status is %s. Sleeping for %s seconds.",
job_status, self.poll_interval
+ )
+ await asyncio.sleep(self.poll_interval)
except Exception as e:
self.log.exception("Exception occurred while checking for
query completion")
yield TriggerEvent({"status": "error", "message": str(e)})
@@ -129,8 +127,8 @@ class BigQueryCheckTrigger(BigQueryInsertJobTrigger):
while True:
try:
# Poll for job execution status
- response_from_hook = await
hook.get_job_status(job_id=self.job_id, project_id=self.project_id)
- if response_from_hook == "success":
+ job_status = await hook.get_job_status(job_id=self.job_id,
project_id=self.project_id)
+ if job_status == "success":
query_results = await
hook.get_job_output(job_id=self.job_id, project_id=self.project_id)
records = hook.get_records(query_results)
@@ -154,14 +152,14 @@ class BigQueryCheckTrigger(BigQueryInsertJobTrigger):
}
)
return
-
- elif response_from_hook == "pending":
- self.log.info("Query is still running...")
- self.log.info("Sleeping for %s seconds.",
self.poll_interval)
- await asyncio.sleep(self.poll_interval)
- else:
- yield TriggerEvent({"status": "error", "message":
response_from_hook})
+ elif job_status == "error":
+ yield TriggerEvent({"status": "error", "message":
job_status})
return
+ else:
+ self.log.info(
+ "Bigquery job status is %s. Sleeping for %s seconds.",
job_status, self.poll_interval
+ )
+ await asyncio.sleep(self.poll_interval)
except Exception as e:
self.log.exception("Exception occurred while checking for
query completion")
yield TriggerEvent({"status": "error", "message": str(e)})
@@ -201,26 +199,27 @@ class BigQueryGetDataTrigger(BigQueryInsertJobTrigger):
while True:
try:
# Poll for job execution status
- response_from_hook = await
hook.get_job_status(job_id=self.job_id, project_id=self.project_id)
- if response_from_hook == "success":
+ job_status = await hook.get_job_status(job_id=self.job_id,
project_id=self.project_id)
+ if job_status == "success":
query_results = await
hook.get_job_output(job_id=self.job_id, project_id=self.project_id)
records = hook.get_records(query_results=query_results,
as_dict=self.as_dict)
- self.log.debug("Response from hook: %s",
response_from_hook)
+ self.log.debug("Response from hook: %s", job_status)
yield TriggerEvent(
{
"status": "success",
- "message": response_from_hook,
+ "message": job_status,
"records": records,
}
)
return
- elif response_from_hook == "pending":
- self.log.info("Query is still running...")
- self.log.info("Sleeping for %s seconds.",
self.poll_interval)
- await asyncio.sleep(self.poll_interval)
- else:
- yield TriggerEvent({"status": "error", "message":
response_from_hook})
+ elif job_status == "error":
+ yield TriggerEvent({"status": "error"})
return
+ else:
+ self.log.info(
+ "Bigquery job status is %s. Sleeping for %s seconds.",
job_status, self.poll_interval
+ )
+ await asyncio.sleep(self.poll_interval)
except Exception as e:
self.log.exception("Exception occurred while checking for
query completion")
yield TriggerEvent({"status": "error", "message": str(e)})
diff --git a/tests/providers/google/cloud/hooks/test_bigquery.py
b/tests/providers/google/cloud/hooks/test_bigquery.py
index 0711b1b550..16207118e0 100644
--- a/tests/providers/google/cloud/hooks/test_bigquery.py
+++ b/tests/providers/google/cloud/hooks/test_bigquery.py
@@ -2139,34 +2139,23 @@ class
TestBigQueryAsyncHookMethods(_BigQueryBaseAsyncTestClass):
result = await hook.get_job_instance(project_id=PROJECT_ID,
job_id=JOB_ID, session=mock_session)
assert isinstance(result, Job)
+ @pytest.mark.parametrize(
+ "job_status, expected",
+ [
+ ({"status": {"state": "DONE"}}, "success"),
+ ({"status": {"state": "DONE", "errorResult": "Timeout"}}, "error"),
+ ({"status": {"state": "running"}}, "running"),
+ ],
+ )
@pytest.mark.asyncio
@mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_instance")
- async def test_get_job_status_success(self, mock_job_instance):
+ async def test_get_job_status(self, mock_job_instance, job_status,
expected):
hook = BigQueryAsyncHook()
mock_job_client = AsyncMock(Job)
mock_job_instance.return_value = mock_job_client
- response = "success"
- mock_job_instance.return_value.result.return_value = response
+ mock_job_instance.return_value.get_job.return_value = job_status
resp = await hook.get_job_status(job_id=JOB_ID, project_id=PROJECT_ID)
- assert resp == response
-
- @pytest.mark.asyncio
-
@mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_instance")
- async def test_get_job_status_oserror(self, mock_job_instance):
- """Assets that the BigQueryAsyncHook returns a pending response when
OSError is raised"""
- mock_job_instance.return_value.result.side_effect = OSError()
- hook = BigQueryAsyncHook()
- job_status = await hook.get_job_status(job_id=JOB_ID,
project_id=PROJECT_ID)
- assert job_status == "pending"
-
- @pytest.mark.asyncio
-
@mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_instance")
- async def test_get_job_status_exception(self, mock_job_instance, caplog):
- """Assets that the logging is done correctly when BigQueryAsyncHook
raises Exception"""
- mock_job_instance.return_value.result.side_effect = Exception()
- hook = BigQueryAsyncHook()
- await hook.get_job_status(job_id=JOB_ID, project_id=PROJECT_ID)
- assert "Query execution finished with errors..." in caplog.text
+ assert resp == expected
@pytest.mark.asyncio
@mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_instance")
diff --git a/tests/providers/google/cloud/triggers/test_bigquery.py
b/tests/providers/google/cloud/triggers/test_bigquery.py
index b1acd7b530..25b375b3d6 100644
--- a/tests/providers/google/cloud/triggers/test_bigquery.py
+++ b/tests/providers/google/cloud/triggers/test_bigquery.py
@@ -180,7 +180,7 @@ class TestBigQueryInsertJobTrigger:
mock_job_client = AsyncMock(Job)
mock_job_instance.return_value = mock_job_client
- mock_job_instance.return_value.result.side_effect = OSError
+ mock_job_instance.return_value.get_job.return_value = {"status":
{"state": "running"}}
caplog.set_level(logging.INFO)
task = asyncio.create_task(insert_job_trigger.run().__anext__())
@@ -189,8 +189,7 @@ class TestBigQueryInsertJobTrigger:
# TriggerEvent was not returned
assert task.done() is False
- assert "Query is still running..." in caplog.text
- assert f"Sleeping for {POLLING_PERIOD_SECONDS} seconds." in caplog.text
+ assert "Bigquery job status is running. Sleeping for 4.0 seconds." in
caplog.text
# Prevents error when task is destroyed while in "pending" state
asyncio.get_event_loop().stop()
@@ -205,7 +204,7 @@ class TestBigQueryInsertJobTrigger:
generator = insert_job_trigger.run()
actual = await generator.asend(None)
- assert TriggerEvent({"status": "error", "message": "error"}) == actual
+ assert TriggerEvent({"status": "error"}) == actual
@pytest.mark.asyncio
@mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_status")
@@ -241,7 +240,7 @@ class TestBigQueryGetDataTrigger:
mock_job_client = AsyncMock(Job)
mock_job_instance.return_value = mock_job_client
- mock_job_instance.return_value.result.side_effect = OSError
+ mock_job_instance.return_value.get_job.return_value = {"status":
{"state": "RUNNING"}}
caplog.set_level(logging.INFO)
task = asyncio.create_task(get_data_trigger.run().__anext__())
@@ -250,8 +249,7 @@ class TestBigQueryGetDataTrigger:
# TriggerEvent was not returned
assert task.done() is False
- assert "Query is still running..." in caplog.text
- assert f"Sleeping for {POLLING_PERIOD_SECONDS} seconds." in caplog.text
+ assert "Bigquery job status is running. Sleeping for 4.0 seconds." in
caplog.text
# Prevents error when task is destroyed while in "pending" state
asyncio.get_event_loop().stop()
@@ -266,7 +264,7 @@ class TestBigQueryGetDataTrigger:
generator = get_data_trigger.run()
actual = await generator.asend(None)
- assert TriggerEvent({"status": "error", "message": "error"}) == actual
+ assert TriggerEvent({"status": "error"}) == actual
@pytest.mark.asyncio
@mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_status")
@@ -336,8 +334,7 @@ class TestBigQueryCheckTrigger:
mock_job_client = AsyncMock(Job)
mock_job_instance.return_value = mock_job_client
- mock_job_instance.return_value.result.side_effect = OSError
- caplog.set_level(logging.INFO)
+ mock_job_instance.return_value.get_job.return_value = {"status":
{"state": "running"}}
task = asyncio.create_task(check_trigger.run().__anext__())
await asyncio.sleep(0.5)
@@ -345,8 +342,7 @@ class TestBigQueryCheckTrigger:
# TriggerEvent was not returned
assert task.done() is False
- assert "Query is still running..." in caplog.text
- assert f"Sleeping for {POLLING_PERIOD_SECONDS} seconds." in caplog.text
+ assert "Bigquery job status is running. Sleeping for 4.0 seconds." in
caplog.text
# Prevents error when task is destroyed while in "pending" state
asyncio.get_event_loop().stop()