This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 9692109c37 Improve Google provider trigger test (#30173)
9692109c37 is described below
commit 9692109c37e996ec31af33fbe205133bac5dc71a
Author: Pankaj Singh <[email protected]>
AuthorDate: Mon Mar 20 16:43:03 2023 +0530
Improve Google provider trigger test (#30173)
---
.../google/cloud/triggers/test_bigquery.py | 1445 ++++++++------------
.../google/cloud/triggers/test_bigquery_dts.py | 25 +-
.../google/cloud/triggers/test_datafusion.py | 15 +-
3 files changed, 591 insertions(+), 894 deletions(-)
diff --git a/tests/providers/google/cloud/triggers/test_bigquery.py
b/tests/providers/google/cloud/triggers/test_bigquery.py
index f4a7773b87..410aa14b1a 100644
--- a/tests/providers/google/cloud/triggers/test_bigquery.py
+++ b/tests/providers/google/cloud/triggers/test_bigquery.py
@@ -22,7 +22,7 @@ from typing import Any
import pytest
from aiohttp import ClientResponseError, RequestInfo
-from gcloud.aio.bigquery import Table
+from gcloud.aio.bigquery import Job, Table
from multidict import CIMultiDict
from yarl import URL
@@ -41,9 +41,6 @@ from tests.providers.google.cloud.utils.compat import
AsyncMock, async_mock
TEST_CONN_ID = "bq_default"
TEST_JOB_ID = "1234"
-RUN_ID = "1"
-RETRY_LIMIT = 2
-RETRY_DELAY = 1.0
TEST_GCP_PROJECT_ID = "test-project"
TEST_DATASET_ID = "bq_dataset"
TEST_TABLE_ID = "bq_table"
@@ -63,68 +60,9 @@ TEST_HOOK_PARAMS: dict[str, Any] = {}
TEST_PARTITION_ID = "1234"
-def test_bigquery_insert_job_op_trigger_serialization():
- """
- Asserts that the BigQueryInsertJobTrigger correctly serializes its
arguments
- and classpath.
- """
- trigger = BigQueryInsertJobTrigger(
- TEST_CONN_ID,
- TEST_JOB_ID,
- TEST_GCP_PROJECT_ID,
- TEST_DATASET_ID,
- TEST_TABLE_ID,
- POLLING_PERIOD_SECONDS,
- )
- classpath, kwargs = trigger.serialize()
- assert classpath ==
"airflow.providers.google.cloud.triggers.bigquery.BigQueryInsertJobTrigger"
- assert kwargs == {
- "conn_id": TEST_CONN_ID,
- "job_id": TEST_JOB_ID,
- "project_id": TEST_GCP_PROJECT_ID,
- "dataset_id": TEST_DATASET_ID,
- "table_id": TEST_TABLE_ID,
- "poll_interval": POLLING_PERIOD_SECONDS,
- }
-
-
[email protected]
-@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_status")
-async def test_bigquery_insert_job_op_trigger_success(mock_job_status):
- """
- Tests the BigQueryInsertJobTrigger only fires once the query execution
reaches a successful state.
- """
- mock_job_status.return_value = "success"
-
- trigger = BigQueryInsertJobTrigger(
- TEST_CONN_ID,
- TEST_JOB_ID,
- TEST_GCP_PROJECT_ID,
- TEST_DATASET_ID,
- TEST_TABLE_ID,
- POLLING_PERIOD_SECONDS,
- )
-
- generator = trigger.run()
- actual = await generator.asend(None)
- assert TriggerEvent({"status": "success", "message": "Job completed",
"job_id": TEST_JOB_ID}) == actual
-
-
[email protected]
-@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_instance")
-async def test_bigquery_insert_job_trigger_running(mock_job_instance, caplog):
- """
- Test that BigQuery Triggers do not fire while a query is still running.
- """
-
- from gcloud.aio.bigquery import Job
-
- mock_job_client = AsyncMock(Job)
- mock_job_instance.return_value = mock_job_client
- mock_job_instance.return_value.result.side_effect = OSError
- caplog.set_level(logging.INFO)
-
- trigger = BigQueryInsertJobTrigger(
[email protected]
+def insert_job_trigger():
+ return BigQueryInsertJobTrigger(
conn_id=TEST_CONN_ID,
job_id=TEST_JOB_ID,
project_id=TEST_GCP_PROJECT_ID,
@@ -132,36 +70,11 @@ async def
test_bigquery_insert_job_trigger_running(mock_job_instance, caplog):
table_id=TEST_TABLE_ID,
poll_interval=POLLING_PERIOD_SECONDS,
)
- task = asyncio.create_task(trigger.run().__anext__())
- await asyncio.sleep(0.5)
-
- # TriggerEvent was not returned
- assert task.done() is False
-
- assert f"Using the connection {TEST_CONN_ID} ." in caplog.text
-
- assert "Query is still running..." in caplog.text
- assert f"Sleeping for {POLLING_PERIOD_SECONDS} seconds." in caplog.text
-
- # Prevents error when task is destroyed while in "pending" state
- asyncio.get_event_loop().stop()
-
-
[email protected]
-@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_instance")
-async def test_bigquery_get_data_trigger_running(mock_job_instance, caplog):
- """
- Test that BigQuery Triggers do not fire while a query is still running.
- """
- from gcloud.aio.bigquery import Job
- mock_job_client = AsyncMock(Job)
- mock_job_instance.return_value = mock_job_client
- mock_job_instance.return_value.result.side_effect = OSError
- caplog.set_level(logging.INFO)
-
- trigger = BigQueryGetDataTrigger(
[email protected]
+def get_data_trigger():
+ return BigQueryGetDataTrigger(
conn_id=TEST_CONN_ID,
job_id=TEST_JOB_ID,
project_id=TEST_GCP_PROJECT_ID,
@@ -169,69 +82,42 @@ async def
test_bigquery_get_data_trigger_running(mock_job_instance, caplog):
table_id=TEST_TABLE_ID,
poll_interval=POLLING_PERIOD_SECONDS,
)
- task = asyncio.create_task(trigger.run().__anext__())
- await asyncio.sleep(0.5)
-
- # TriggerEvent was not returned
- assert task.done() is False
-
- assert f"Using the connection {TEST_CONN_ID} ." in caplog.text
-
- assert "Query is still running..." in caplog.text
- assert f"Sleeping for {POLLING_PERIOD_SECONDS} seconds." in caplog.text
-
- # Prevents error when task is destroyed while in "pending" state
- asyncio.get_event_loop().stop()
-
[email protected]
-@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_instance")
-async def test_bigquery_check_trigger_running(mock_job_instance, caplog):
- """
- Test that BigQuery Triggers do not fire while a query is still running.
- """
- from gcloud.aio.bigquery import Job
[email protected]
+def table_existence_trigger():
+ return BigQueryTableExistenceTrigger(
+ TEST_GCP_PROJECT_ID,
+ TEST_DATASET_ID,
+ TEST_TABLE_ID,
+ TEST_GCP_CONN_ID,
+ TEST_HOOK_PARAMS,
+ POLLING_PERIOD_SECONDS,
+ )
- mock_job_client = AsyncMock(Job)
- mock_job_instance.return_value = mock_job_client
- mock_job_instance.return_value.result.side_effect = OSError
- caplog.set_level(logging.INFO)
- trigger = BigQueryCheckTrigger(
[email protected]
+def interval_check_trigger():
+ return BigQueryIntervalCheckTrigger(
conn_id=TEST_CONN_ID,
- job_id=TEST_JOB_ID,
+ first_job_id=TEST_FIRST_JOB_ID,
+ second_job_id=TEST_SECOND_JOB_ID,
project_id=TEST_GCP_PROJECT_ID,
+ table=TEST_TABLE_ID,
+ metrics_thresholds=TEST_METRIC_THRESHOLDS,
+ date_filter_column=TEST_DATE_FILTER_COLUMN,
+ days_back=TEST_DAYS_BACK,
+ ratio_formula=TEST_RATIO_FORMULA,
+ ignore_zero=TEST_IGNORE_ZERO,
dataset_id=TEST_DATASET_ID,
table_id=TEST_TABLE_ID,
poll_interval=POLLING_PERIOD_SECONDS,
)
- task = asyncio.create_task(trigger.run().__anext__())
- await asyncio.sleep(0.5)
- # TriggerEvent was not returned
- assert task.done() is False
- assert f"Using the connection {TEST_CONN_ID} ." in caplog.text
-
- assert "Query is still running..." in caplog.text
- assert f"Sleeping for {POLLING_PERIOD_SECONDS} seconds." in caplog.text
-
- # Prevents error when task is destroyed while in "pending" state
- asyncio.get_event_loop().stop()
-
-
[email protected]
-@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_status")
-async def test_bigquery_op_trigger_terminated(mock_job_status, caplog):
- """
- Test that BigQuery Triggers fire the correct event in case of an error.
- """
- # Set the status to a value other than success or pending
-
- mock_job_status.return_value = "error"
-
- trigger = BigQueryInsertJobTrigger(
[email protected]
+def check_trigger():
+ return BigQueryCheckTrigger(
conn_id=TEST_CONN_ID,
job_id=TEST_JOB_ID,
project_id=TEST_GCP_PROJECT_ID,
@@ -240,815 +126,636 @@ async def
test_bigquery_op_trigger_terminated(mock_job_status, caplog):
poll_interval=POLLING_PERIOD_SECONDS,
)
- generator = trigger.run()
- actual = await generator.asend(None)
- assert TriggerEvent({"status": "error", "message": "error"}) == actual
-
[email protected]
-@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_status")
-async def test_bigquery_check_trigger_terminated(mock_job_status, caplog):
- """
- Test that BigQuery Triggers fire the correct event in case of an error.
- """
- # Set the status to a value other than success or pending
-
- mock_job_status.return_value = "error"
-
- trigger = BigQueryCheckTrigger(
[email protected]
+def value_check_trigger():
+ return BigQueryValueCheckTrigger(
conn_id=TEST_CONN_ID,
+ pass_value=TEST_PASS_VALUE,
job_id=TEST_JOB_ID,
- project_id=TEST_GCP_PROJECT_ID,
dataset_id=TEST_DATASET_ID,
+ project_id=TEST_GCP_PROJECT_ID,
+ sql=TEST_SQL_QUERY,
table_id=TEST_TABLE_ID,
+ tolerance=TEST_TOLERANCE,
poll_interval=POLLING_PERIOD_SECONDS,
)
- generator = trigger.run()
- actual = await generator.asend(None)
- assert TriggerEvent({"status": "error", "message": "error"}) == actual
+class TestBigQueryInsertJobTrigger:
+ def test_serialization(self, insert_job_trigger):
+ """
+ Asserts that the BigQueryInsertJobTrigger correctly serializes its
arguments and classpath.
+ """
+ classpath, kwargs = insert_job_trigger.serialize()
+ assert classpath ==
"airflow.providers.google.cloud.triggers.bigquery.BigQueryInsertJobTrigger"
+ assert kwargs == {
+ "conn_id": TEST_CONN_ID,
+ "job_id": TEST_JOB_ID,
+ "project_id": TEST_GCP_PROJECT_ID,
+ "dataset_id": TEST_DATASET_ID,
+ "table_id": TEST_TABLE_ID,
+ "poll_interval": POLLING_PERIOD_SECONDS,
+ }
[email protected]
-@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_status")
-async def test_bigquery_get_data_trigger_terminated(mock_job_status, caplog):
- """
- Test that BigQuery Triggers fire the correct event in case of an error.
- """
- # Set the status to a value other than success or pending
+ @pytest.mark.asyncio
+
@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_status")
+ async def test_bigquery_insert_job_op_trigger_success(self,
mock_job_status, insert_job_trigger):
+ """
+ Tests the BigQueryInsertJobTrigger only fires once the query execution
reaches a successful state.
+ """
+ mock_job_status.return_value = "success"
- mock_job_status.return_value = "error"
+ generator = insert_job_trigger.run()
+ actual = await generator.asend(None)
+ assert (
+ TriggerEvent({"status": "success", "message": "Job completed",
"job_id": TEST_JOB_ID}) == actual
+ )
- trigger = BigQueryGetDataTrigger(
- conn_id=TEST_CONN_ID,
- job_id=TEST_JOB_ID,
- project_id=TEST_GCP_PROJECT_ID,
- dataset_id=TEST_DATASET_ID,
- table_id=TEST_TABLE_ID,
- poll_interval=POLLING_PERIOD_SECONDS,
- )
+ @pytest.mark.asyncio
+
@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_instance")
+ async def test_bigquery_insert_job_trigger_running(self,
mock_job_instance, caplog, insert_job_trigger):
+ """Test that BigQuery Triggers do not fire while a query is still
running."""
- generator = trigger.run()
- actual = await generator.asend(None)
- assert TriggerEvent({"status": "error", "message": "error"}) == actual
+ mock_job_client = AsyncMock(Job)
+ mock_job_instance.return_value = mock_job_client
+ mock_job_instance.return_value.result.side_effect = OSError
+ caplog.set_level(logging.INFO)
+ task = asyncio.create_task(insert_job_trigger.run().__anext__())
+ await asyncio.sleep(0.5)
[email protected]
-@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_status")
-async def test_bigquery_op_trigger_exception(mock_job_status, caplog):
- """
- Test that BigQuery Triggers fire the correct event in case of an error.
- """
- mock_job_status.side_effect = Exception("Test exception")
+ # TriggerEvent was not returned
+ assert task.done() is False
- trigger = BigQueryInsertJobTrigger(
- conn_id=TEST_CONN_ID,
- job_id=TEST_JOB_ID,
- project_id=TEST_GCP_PROJECT_ID,
- dataset_id=TEST_DATASET_ID,
- table_id=TEST_TABLE_ID,
- poll_interval=POLLING_PERIOD_SECONDS,
- )
+ assert "Query is still running..." in caplog.text
+ assert f"Sleeping for {POLLING_PERIOD_SECONDS} seconds." in caplog.text
- generator = trigger.run()
- actual = await generator.asend(None)
- assert TriggerEvent({"status": "error", "message": "Test exception"}) ==
actual
+ # Prevents error when task is destroyed while in "pending" state
+ asyncio.get_event_loop().stop()
+ @pytest.mark.asyncio
+
@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_status")
+ async def test_bigquery_op_trigger_terminated(self, mock_job_status,
caplog, insert_job_trigger):
+ """Test that BigQuery Triggers fire the correct event in case of an
error."""
+ # Set the status to a value other than success or pending
[email protected]
-@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_status")
-async def test_bigquery_check_trigger_exception(mock_job_status, caplog):
- """
- Test that BigQuery Triggers fire the correct event in case of an error.
- """
- mock_job_status.side_effect = Exception("Test exception")
+ mock_job_status.return_value = "error"
- trigger = BigQueryCheckTrigger(
- conn_id=TEST_CONN_ID,
- job_id=TEST_JOB_ID,
- project_id=TEST_GCP_PROJECT_ID,
- dataset_id=TEST_DATASET_ID,
- table_id=TEST_TABLE_ID,
- poll_interval=POLLING_PERIOD_SECONDS,
- )
+ generator = insert_job_trigger.run()
+ actual = await generator.asend(None)
+ assert TriggerEvent({"status": "error", "message": "error"}) == actual
- generator = trigger.run()
- actual = await generator.asend(None)
- assert TriggerEvent({"status": "error", "message": "Test exception"}) ==
actual
+ @pytest.mark.asyncio
+
@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_status")
+ async def test_bigquery_op_trigger_exception(self, mock_job_status,
caplog, insert_job_trigger):
+ """Test that BigQuery Triggers fire the correct event in case of an
error."""
+ mock_job_status.side_effect = Exception("Test exception")
+ generator = insert_job_trigger.run()
+ actual = await generator.asend(None)
+ assert TriggerEvent({"status": "error", "message": "Test exception"})
== actual
[email protected]
-@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_status")
-async def test_bigquery_get_data_trigger_exception(mock_job_status, caplog):
- """
- Test that BigQuery Triggers fire the correct event in case of an error.
- """
- mock_job_status.side_effect = Exception("Test exception")
- trigger = BigQueryGetDataTrigger(
- conn_id=TEST_CONN_ID,
- job_id=TEST_JOB_ID,
- project_id=TEST_GCP_PROJECT_ID,
- dataset_id=TEST_DATASET_ID,
- table_id=TEST_TABLE_ID,
- poll_interval=POLLING_PERIOD_SECONDS,
- )
+class TestBigQueryGetDataTrigger:
+ def test_bigquery_get_data_trigger_serialization(self, get_data_trigger):
+ """Asserts that the BigQueryGetDataTrigger correctly serializes its
arguments and classpath."""
- generator = trigger.run()
- actual = await generator.asend(None)
- assert TriggerEvent({"status": "error", "message": "Test exception"}) ==
actual
+ classpath, kwargs = get_data_trigger.serialize()
+ assert classpath ==
"airflow.providers.google.cloud.triggers.bigquery.BigQueryGetDataTrigger"
+ assert kwargs == {
+ "conn_id": TEST_CONN_ID,
+ "job_id": TEST_JOB_ID,
+ "dataset_id": TEST_DATASET_ID,
+ "project_id": TEST_GCP_PROJECT_ID,
+ "table_id": TEST_TABLE_ID,
+ "poll_interval": POLLING_PERIOD_SECONDS,
+ }
+ @pytest.mark.asyncio
+
@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_instance")
+ async def test_bigquery_get_data_trigger_running(self, mock_job_instance,
caplog, get_data_trigger):
+ """Test that BigQuery Triggers do not fire while a query is still
running."""
-def test_bigquery_check_op_trigger_serialization():
- """
- Asserts that the BigQueryCheckTrigger correctly serializes its arguments
- and classpath.
- """
- trigger = BigQueryCheckTrigger(
- TEST_CONN_ID,
- TEST_JOB_ID,
- TEST_GCP_PROJECT_ID,
- TEST_DATASET_ID,
- TEST_TABLE_ID,
- POLLING_PERIOD_SECONDS,
- )
- classpath, kwargs = trigger.serialize()
- assert classpath ==
"airflow.providers.google.cloud.triggers.bigquery.BigQueryCheckTrigger"
- assert kwargs == {
- "conn_id": TEST_CONN_ID,
- "job_id": TEST_JOB_ID,
- "dataset_id": TEST_DATASET_ID,
- "project_id": TEST_GCP_PROJECT_ID,
- "table_id": TEST_TABLE_ID,
- "poll_interval": POLLING_PERIOD_SECONDS,
- }
-
-
[email protected]
-@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_status")
-@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_output")
-async def test_bigquery_check_op_trigger_success_with_data(mock_job_output,
mock_job_status):
- """
- Test the BigQueryCheckTrigger only fires once the query execution reaches
a successful state.
- """
- mock_job_status.return_value = "success"
- mock_job_output.return_value = {
- "kind": "bigquery#getQueryResultsResponse",
- "etag": "test_etag",
- "schema": {"fields": [{"name": "f0_", "type": "INTEGER", "mode":
"NULLABLE"}]},
- "jobReference": {
- "projectId": "test_airflow-providers",
- "jobId": "test_jobid",
- "location": "US",
- },
- "totalRows": "1",
- "rows": [{"f": [{"v": "22"}]}],
- "totalBytesProcessed": "0",
- "jobComplete": True,
- "cacheHit": False,
- }
-
- trigger = BigQueryCheckTrigger(
- TEST_CONN_ID,
- TEST_JOB_ID,
- TEST_GCP_PROJECT_ID,
- TEST_DATASET_ID,
- TEST_TABLE_ID,
- POLLING_PERIOD_SECONDS,
- )
+ mock_job_client = AsyncMock(Job)
+ mock_job_instance.return_value = mock_job_client
+ mock_job_instance.return_value.result.side_effect = OSError
+ caplog.set_level(logging.INFO)
- generator = trigger.run()
- actual = await generator.asend(None)
-
- assert TriggerEvent({"status": "success", "records": [22]}) == actual
-
-
[email protected]
-@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_status")
-@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_output")
-async def test_bigquery_check_op_trigger_success_without_data(mock_job_output,
mock_job_status):
- """
- Tests that BigQueryCheckTrigger sends TriggerEvent as { "status":
"success", "records": None}
- when no rows are available in the query result.
- """
- mock_job_status.return_value = "success"
- mock_job_output.return_value = {
- "kind": "bigquery#getQueryResultsResponse",
- "etag": "test_etag",
- "schema": {
- "fields": [
- {"name": "value", "type": "INTEGER", "mode": "NULLABLE"},
- {"name": "name", "type": "STRING", "mode": "NULLABLE"},
- {"name": "ds", "type": "DATE", "mode": "NULLABLE"},
- ]
- },
- "jobReference": {
- "projectId": "test_airflow-airflow-providers",
- "jobId": "test_jobid",
- "location": "US",
- },
- "totalRows": "0",
- "totalBytesProcessed": "0",
- "jobComplete": True,
- "cacheHit": False,
- }
-
- trigger = BigQueryCheckTrigger(
- TEST_CONN_ID,
- TEST_JOB_ID,
- TEST_GCP_PROJECT_ID,
- TEST_DATASET_ID,
- TEST_TABLE_ID,
- POLLING_PERIOD_SECONDS,
- )
- generator = trigger.run()
- actual = await generator.asend(None)
- assert TriggerEvent({"status": "success", "records": None}) == actual
+ task = asyncio.create_task(get_data_trigger.run().__anext__())
+ await asyncio.sleep(0.5)
+ # TriggerEvent was not returned
+ assert task.done() is False
-def test_bigquery_get_data_trigger_serialization():
- """
- Asserts that the BigQueryGetDataTrigger correctly serializes its arguments
- and classpath.
- """
- trigger = BigQueryGetDataTrigger(
- conn_id=TEST_CONN_ID,
- job_id=TEST_JOB_ID,
- project_id=TEST_GCP_PROJECT_ID,
- dataset_id=TEST_DATASET_ID,
- table_id=TEST_TABLE_ID,
- poll_interval=POLLING_PERIOD_SECONDS,
- )
- classpath, kwargs = trigger.serialize()
- assert classpath ==
"airflow.providers.google.cloud.triggers.bigquery.BigQueryGetDataTrigger"
- assert kwargs == {
- "conn_id": TEST_CONN_ID,
- "job_id": TEST_JOB_ID,
- "dataset_id": TEST_DATASET_ID,
- "project_id": TEST_GCP_PROJECT_ID,
- "table_id": TEST_TABLE_ID,
- "poll_interval": POLLING_PERIOD_SECONDS,
- }
-
-
[email protected]
-@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_status")
-@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_output")
-async def test_bigquery_get_data_trigger_success_with_data(mock_job_output,
mock_job_status):
- """
- Tests that BigQueryGetDataTrigger only fires once the query execution
reaches a successful state.
- """
- mock_job_status.return_value = "success"
- mock_job_output.return_value = {
- "kind": "bigquery#tableDataList",
- "etag": "test_etag",
- "schema": {
- "fields": [
- {"name": "f0_", "type": "INTEGER", "mode": "NULLABLE"},
- {"name": "f1_", "type": "STRING", "mode": "NULLABLE"},
- ]
- },
- "jobReference": {
- "projectId": "test-airflow-providers",
- "jobId": "test_jobid",
- "location": "US",
- },
- "totalRows": "10",
- "rows": [{"f": [{"v": "42"}, {"v": "monthy python"}]}, {"f": [{"v":
"42"}, {"v": "fishy fish"}]}],
- "totalBytesProcessed": "0",
- "jobComplete": True,
- "cacheHit": False,
- }
-
- trigger = BigQueryGetDataTrigger(
- TEST_CONN_ID,
- TEST_JOB_ID,
- TEST_GCP_PROJECT_ID,
- TEST_DATASET_ID,
- TEST_TABLE_ID,
- POLLING_PERIOD_SECONDS,
- )
+ assert "Query is still running..." in caplog.text
+ assert f"Sleeping for {POLLING_PERIOD_SECONDS} seconds." in caplog.text
+
+ # Prevents error when task is destroyed while in "pending" state
+ asyncio.get_event_loop().stop()
- generator = trigger.run()
- actual = await generator.asend(None)
- # # The extracted row will be parsed and formatted to retrieve the value
from the
- # # structure - 'rows":[{"f":[{"v":"42"},{"v":"monthy
python"}]},{"f":[{"v":"42"},{"v":"fishy fish"}]}]
-
- assert (
- TriggerEvent(
- {
- "status": "success",
- "message": "success",
- "records": [[42, "monthy python"], [42, "fishy fish"]],
- }
+ @pytest.mark.asyncio
+
@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_status")
+ async def test_bigquery_get_data_trigger_terminated(self, mock_job_status,
caplog, get_data_trigger):
+ """Test that BigQuery Triggers fire the correct event in case of an
error."""
+ # Set the status to a value other than success or pending
+
+ mock_job_status.return_value = "error"
+
+ generator = get_data_trigger.run()
+ actual = await generator.asend(None)
+ assert TriggerEvent({"status": "error", "message": "error"}) == actual
+
+ @pytest.mark.asyncio
+
@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_status")
+ async def test_bigquery_get_data_trigger_exception(self, mock_job_status,
caplog, get_data_trigger):
+ """Test that BigQuery Triggers fire the correct event in case of an
error."""
+ mock_job_status.side_effect = Exception("Test exception")
+
+ generator = get_data_trigger.run()
+ actual = await generator.asend(None)
+ assert TriggerEvent({"status": "error", "message": "Test exception"})
== actual
+
+ @pytest.mark.asyncio
+
@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_status")
+
@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_output")
+ async def test_bigquery_get_data_trigger_success_with_data(
+ self, mock_job_output, mock_job_status, get_data_trigger
+ ):
+ """
+ Tests that BigQueryGetDataTrigger only fires once the query execution
reaches a successful state.
+ """
+ mock_job_status.return_value = "success"
+ mock_job_output.return_value = {
+ "kind": "bigquery#tableDataList",
+ "etag": "test_etag",
+ "schema": {
+ "fields": [
+ {"name": "f0_", "type": "INTEGER", "mode": "NULLABLE"},
+ {"name": "f1_", "type": "STRING", "mode": "NULLABLE"},
+ ]
+ },
+ "jobReference": {
+ "projectId": "test-airflow-providers",
+ "jobId": "test_jobid",
+ "location": "US",
+ },
+ "totalRows": "10",
+ "rows": [{"f": [{"v": "42"}, {"v": "monthy python"}]}, {"f":
[{"v": "42"}, {"v": "fishy fish"}]}],
+ "totalBytesProcessed": "0",
+ "jobComplete": True,
+ "cacheHit": False,
+ }
+
+ generator = get_data_trigger.run()
+ actual = await generator.asend(None)
+ # The extracted row will be parsed and formatted to retrieve the value
from the
+ # structure - 'rows":[{"f":[{"v":"42"},{"v":"monthy
python"}]},{"f":[{"v":"42"},{"v":"fishy fish"}]}]
+
+ assert (
+ TriggerEvent(
+ {
+ "status": "success",
+ "message": "success",
+ "records": [[42, "monthy python"], [42, "fishy fish"]],
+ }
+ )
+ == actual
)
- == actual
- )
- # Prevents error when task is destroyed while in "pending" state
- asyncio.get_event_loop().stop()
-
-
-def test_bigquery_interval_check_trigger_serialization():
- """
- Asserts that the BigQueryIntervalCheckTrigger correctly serializes its
arguments
- and classpath.
- """
- trigger = BigQueryIntervalCheckTrigger(
- TEST_CONN_ID,
- TEST_FIRST_JOB_ID,
- TEST_SECOND_JOB_ID,
- TEST_GCP_PROJECT_ID,
- TEST_TABLE_ID,
- TEST_METRIC_THRESHOLDS,
- TEST_DATE_FILTER_COLUMN,
- TEST_DAYS_BACK,
- TEST_RATIO_FORMULA,
- TEST_IGNORE_ZERO,
- TEST_DATASET_ID,
- TEST_TABLE_ID,
- POLLING_PERIOD_SECONDS,
- )
- classpath, kwargs = trigger.serialize()
- assert classpath ==
"airflow.providers.google.cloud.triggers.bigquery.BigQueryIntervalCheckTrigger"
- assert kwargs == {
- "conn_id": TEST_CONN_ID,
- "first_job_id": TEST_FIRST_JOB_ID,
- "second_job_id": TEST_SECOND_JOB_ID,
- "project_id": TEST_GCP_PROJECT_ID,
- "table": TEST_TABLE_ID,
- "metrics_thresholds": TEST_METRIC_THRESHOLDS,
- "date_filter_column": TEST_DATE_FILTER_COLUMN,
- "days_back": TEST_DAYS_BACK,
- "ratio_formula": TEST_RATIO_FORMULA,
- "ignore_zero": TEST_IGNORE_ZERO,
- }
-
-
[email protected]
-@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_status")
-@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_output")
-async def test_bigquery_interval_check_trigger_success(mock_get_job_output,
mock_job_status):
- """
- Tests the BigQueryIntervalCheckTrigger only fires once the query execution
reaches a successful state.
- """
- mock_job_status.return_value = "success"
- mock_get_job_output.return_value = ["0"]
-
- trigger = BigQueryIntervalCheckTrigger(
- conn_id=TEST_CONN_ID,
- first_job_id=TEST_FIRST_JOB_ID,
- second_job_id=TEST_SECOND_JOB_ID,
- project_id=TEST_GCP_PROJECT_ID,
- table=TEST_TABLE_ID,
- metrics_thresholds=TEST_METRIC_THRESHOLDS,
- date_filter_column=TEST_DATE_FILTER_COLUMN,
- days_back=TEST_DAYS_BACK,
- ratio_formula=TEST_RATIO_FORMULA,
- ignore_zero=TEST_IGNORE_ZERO,
- dataset_id=TEST_DATASET_ID,
- table_id=TEST_TABLE_ID,
- poll_interval=POLLING_PERIOD_SECONDS,
- )
+ # Prevents error when task is destroyed while in "pending" state
+ asyncio.get_event_loop().stop()
- generator = trigger.run()
- actual = await generator.asend(None)
- assert actual == TriggerEvent({"status": "error", "message": "The second
SQL query returned None"})
+class TestBigQueryCheckTrigger:
+ @pytest.mark.asyncio
+
@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_instance")
+ async def test_bigquery_check_trigger_running(self, mock_job_instance,
caplog, check_trigger):
+ """Test that BigQuery Triggers do not fire while a query is still
running."""
[email protected]
-@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_status")
-async def test_bigquery_interval_check_trigger_pending(mock_job_status,
caplog):
- """
- Tests that the BigQueryIntervalCheckTrigger do not fire while a query is
still running.
- """
- mock_job_status.return_value = "pending"
- caplog.set_level(logging.INFO)
+ mock_job_client = AsyncMock(Job)
+ mock_job_instance.return_value = mock_job_client
+ mock_job_instance.return_value.result.side_effect = OSError
+ caplog.set_level(logging.INFO)
- trigger = BigQueryIntervalCheckTrigger(
- conn_id=TEST_CONN_ID,
- first_job_id=TEST_FIRST_JOB_ID,
- second_job_id=TEST_SECOND_JOB_ID,
- project_id=TEST_GCP_PROJECT_ID,
- table=TEST_TABLE_ID,
- metrics_thresholds=TEST_METRIC_THRESHOLDS,
- date_filter_column=TEST_DATE_FILTER_COLUMN,
- days_back=TEST_DAYS_BACK,
- ratio_formula=TEST_RATIO_FORMULA,
- ignore_zero=TEST_IGNORE_ZERO,
- dataset_id=TEST_DATASET_ID,
- table_id=TEST_TABLE_ID,
- poll_interval=POLLING_PERIOD_SECONDS,
- )
- task = asyncio.create_task(trigger.run().__anext__())
- await asyncio.sleep(0.5)
+ task = asyncio.create_task(check_trigger.run().__anext__())
+ await asyncio.sleep(0.5)
- # TriggerEvent was not returned
- assert task.done() is False
+ # TriggerEvent was not returned
+ assert task.done() is False
- assert f"Using the connection {TEST_CONN_ID} ." in caplog.text
+ assert "Query is still running..." in caplog.text
+ assert f"Sleeping for {POLLING_PERIOD_SECONDS} seconds." in caplog.text
- assert "Query is still running..." in caplog.text
- assert f"Sleeping for {POLLING_PERIOD_SECONDS} seconds." in caplog.text
+ # Prevents error when task is destroyed while in "pending" state
+ asyncio.get_event_loop().stop()
- # Prevents error when task is destroyed while in "pending" state
- asyncio.get_event_loop().stop()
+ @pytest.mark.asyncio
+
@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_status")
+ async def test_bigquery_check_trigger_terminated(self, mock_job_status,
caplog, check_trigger):
+ """Test that BigQuery Triggers fire the correct event in case of an
error."""
+ # Set the status to a value other than success or pending
+ mock_job_status.return_value = "error"
[email protected]
-@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_status")
-async def test_bigquery_interval_check_trigger_terminated(mock_job_status):
- """
- Tests the BigQueryIntervalCheckTrigger fires the correct event in case of
an error.
- """
- # Set the status to a value other than success or pending
- mock_job_status.return_value = "error"
- trigger = BigQueryIntervalCheckTrigger(
- conn_id=TEST_CONN_ID,
- first_job_id=TEST_FIRST_JOB_ID,
- second_job_id=TEST_SECOND_JOB_ID,
- project_id=TEST_GCP_PROJECT_ID,
- table=TEST_TABLE_ID,
- metrics_thresholds=TEST_METRIC_THRESHOLDS,
- date_filter_column=TEST_DATE_FILTER_COLUMN,
- days_back=TEST_DAYS_BACK,
- ratio_formula=TEST_RATIO_FORMULA,
- ignore_zero=TEST_IGNORE_ZERO,
- dataset_id=TEST_DATASET_ID,
- table_id=TEST_TABLE_ID,
- poll_interval=POLLING_PERIOD_SECONDS,
- )
+ generator = check_trigger.run()
+ actual = await generator.asend(None)
+ assert TriggerEvent({"status": "error", "message": "error"}) == actual
- generator = trigger.run()
- actual = await generator.asend(None)
+ @pytest.mark.asyncio
+
@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_status")
+ async def test_check_trigger_exception(self, mock_job_status, caplog,
check_trigger):
+ """Test that BigQuery Triggers fire the correct event in case of an
error."""
+ mock_job_status.side_effect = Exception("Test exception")
- assert TriggerEvent({"status": "error", "message": "error", "data": None})
== actual
+ generator = check_trigger.run()
+ actual = await generator.asend(None)
+ assert TriggerEvent({"status": "error", "message": "Test exception"})
== actual
+ def test_check_trigger_serialization(self, check_trigger):
+ """Asserts that the BigQueryCheckTrigger correctly serializes its
arguments and classpath."""
[email protected]
-@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_status")
-async def test_bigquery_interval_check_trigger_exception(mock_job_status,
caplog):
- """
- Tests that the BigQueryIntervalCheckTrigger fires the correct event in
case of an error.
- """
- mock_job_status.side_effect = Exception("Test exception")
- caplog.set_level(logging.DEBUG)
+ classpath, kwargs = check_trigger.serialize()
+ assert classpath ==
"airflow.providers.google.cloud.triggers.bigquery.BigQueryCheckTrigger"
+ assert kwargs == {
+ "conn_id": TEST_CONN_ID,
+ "job_id": TEST_JOB_ID,
+ "dataset_id": TEST_DATASET_ID,
+ "project_id": TEST_GCP_PROJECT_ID,
+ "table_id": TEST_TABLE_ID,
+ "poll_interval": POLLING_PERIOD_SECONDS,
+ }
- trigger = BigQueryIntervalCheckTrigger(
- conn_id=TEST_CONN_ID,
- first_job_id=TEST_FIRST_JOB_ID,
- second_job_id=TEST_SECOND_JOB_ID,
- project_id=TEST_GCP_PROJECT_ID,
- table=TEST_TABLE_ID,
- metrics_thresholds=TEST_METRIC_THRESHOLDS,
- date_filter_column=TEST_DATE_FILTER_COLUMN,
- days_back=TEST_DAYS_BACK,
- ratio_formula=TEST_RATIO_FORMULA,
- ignore_zero=TEST_IGNORE_ZERO,
- dataset_id=TEST_DATASET_ID,
- table_id=TEST_TABLE_ID,
- poll_interval=POLLING_PERIOD_SECONDS,
- )
+ @pytest.mark.asyncio
+
@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_status")
+
@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_output")
+ async def test_check_trigger_success_with_data(self, mock_job_output,
mock_job_status, check_trigger):
+ """
+ Test the BigQueryCheckTrigger only fires once the query execution
reaches a successful state.
+ """
+ mock_job_status.return_value = "success"
+ mock_job_output.return_value = {
+ "kind": "bigquery#getQueryResultsResponse",
+ "etag": "test_etag",
+ "schema": {"fields": [{"name": "f0_", "type": "INTEGER", "mode":
"NULLABLE"}]},
+ "jobReference": {
+ "projectId": "test_airflow-providers",
+ "jobId": "test_jobid",
+ "location": "US",
+ },
+ "totalRows": "1",
+ "rows": [{"f": [{"v": "22"}]}],
+ "totalBytesProcessed": "0",
+ "jobComplete": True,
+ "cacheHit": False,
+ }
- # trigger event is yielded so it creates a generator object
- # so i have used async for to get all the values and added it to task
- task = [i async for i in trigger.run()]
- # since we use return as soon as we yield the trigger event
- # at any given point there should be one trigger event returned to the task
- # so we validate for length of task to be 1
+ generator = check_trigger.run()
+ actual = await generator.asend(None)
- assert len(task) == 1
- assert TriggerEvent({"status": "error", "message": "Test exception"}) in
task
+ assert TriggerEvent({"status": "success", "records": [22]}) == actual
+ @pytest.mark.asyncio
+
@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_status")
+
@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_output")
+ async def test_check_trigger_success_without_data(self, mock_job_output,
mock_job_status, check_trigger):
+ """
+ Tests that BigQueryCheckTrigger sends TriggerEvent as { "status":
"success", "records": None}
+ when no rows are available in the query result.
+ """
+ mock_job_status.return_value = "success"
+ mock_job_output.return_value = {
+ "kind": "bigquery#getQueryResultsResponse",
+ "etag": "test_etag",
+ "schema": {
+ "fields": [
+ {"name": "value", "type": "INTEGER", "mode": "NULLABLE"},
+ {"name": "name", "type": "STRING", "mode": "NULLABLE"},
+ {"name": "ds", "type": "DATE", "mode": "NULLABLE"},
+ ]
+ },
+ "jobReference": {
+ "projectId": "test_airflow-airflow-providers",
+ "jobId": "test_jobid",
+ "location": "US",
+ },
+ "totalRows": "0",
+ "totalBytesProcessed": "0",
+ "jobComplete": True,
+ "cacheHit": False,
+ }
-def test_bigquery_value_check_op_trigger_serialization():
- """
- Asserts that the BigQueryValueCheckTrigger correctly serializes its
arguments
- and classpath.
- """
+ generator = check_trigger.run()
+ actual = await generator.asend(None)
+ assert TriggerEvent({"status": "success", "records": None}) == actual
- trigger = BigQueryValueCheckTrigger(
- conn_id=TEST_CONN_ID,
- pass_value=TEST_PASS_VALUE,
- job_id=TEST_JOB_ID,
- dataset_id=TEST_DATASET_ID,
- project_id=TEST_GCP_PROJECT_ID,
- sql=TEST_SQL_QUERY,
- table_id=TEST_TABLE_ID,
- tolerance=TEST_TOLERANCE,
- poll_interval=POLLING_PERIOD_SECONDS,
- )
- classpath, kwargs = trigger.serialize()
-
- assert classpath ==
"airflow.providers.google.cloud.triggers.bigquery.BigQueryValueCheckTrigger"
- assert kwargs == {
- "conn_id": TEST_CONN_ID,
- "pass_value": TEST_PASS_VALUE,
- "job_id": TEST_JOB_ID,
- "dataset_id": TEST_DATASET_ID,
- "project_id": TEST_GCP_PROJECT_ID,
- "sql": TEST_SQL_QUERY,
- "table_id": TEST_TABLE_ID,
- "tolerance": TEST_TOLERANCE,
- "poll_interval": POLLING_PERIOD_SECONDS,
- }
-
-
[email protected]
-@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_records")
-@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_output")
-@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_status")
-async def test_bigquery_value_check_op_trigger_success(mock_job_status,
get_job_output, get_records):
- """
- Tests that the BigQueryValueCheckTrigger only fires once the query
execution reaches a successful state.
- """
- mock_job_status.return_value = "success"
- get_job_output.return_value = {}
- get_records.return_value = [[2], [4]]
-
- trigger = BigQueryValueCheckTrigger(
- conn_id=TEST_CONN_ID,
- pass_value=TEST_PASS_VALUE,
- job_id=TEST_JOB_ID,
- dataset_id=TEST_DATASET_ID,
- project_id=TEST_GCP_PROJECT_ID,
- sql=TEST_SQL_QUERY,
- table_id=TEST_TABLE_ID,
- tolerance=TEST_TOLERANCE,
- poll_interval=POLLING_PERIOD_SECONDS,
- )
- asyncio.create_task(trigger.run().__anext__())
- await asyncio.sleep(0.5)
+class TestBigQueryIntervalCheckTrigger:
+ def test_interval_check_trigger_serialization(self,
interval_check_trigger):
+ """
+ Asserts that the BigQueryIntervalCheckTrigger correctly serializes its
arguments and classpath.
+ """
+
+ classpath, kwargs = interval_check_trigger.serialize()
+ assert classpath ==
"airflow.providers.google.cloud.triggers.bigquery.BigQueryIntervalCheckTrigger"
+ assert kwargs == {
+ "conn_id": TEST_CONN_ID,
+ "first_job_id": TEST_FIRST_JOB_ID,
+ "second_job_id": TEST_SECOND_JOB_ID,
+ "project_id": TEST_GCP_PROJECT_ID,
+ "table": TEST_TABLE_ID,
+ "metrics_thresholds": TEST_METRIC_THRESHOLDS,
+ "date_filter_column": TEST_DATE_FILTER_COLUMN,
+ "days_back": TEST_DAYS_BACK,
+ "ratio_formula": TEST_RATIO_FORMULA,
+ "ignore_zero": TEST_IGNORE_ZERO,
+ }
- generator = trigger.run()
- actual = await generator.asend(None)
- assert actual == TriggerEvent({"status": "success", "message": "Job
completed", "records": [4]})
+ @pytest.mark.asyncio
+
@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_status")
+
@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_output")
+ async def test_interval_check_trigger_success(
+ self, mock_get_job_output, mock_job_status, interval_check_trigger
+ ):
+ """
+ Tests the BigQueryIntervalCheckTrigger only fires once the query
execution reaches a successful state.
+ """
+ mock_job_status.return_value = "success"
+ mock_get_job_output.return_value = ["0"]
+ generator = interval_check_trigger.run()
+ actual = await generator.asend(None)
+ assert actual == TriggerEvent({"status": "error", "message": "The
second SQL query returned None"})
[email protected]
-@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_status")
-async def test_bigquery_value_check_op_trigger_pending(mock_job_status,
caplog):
- """
- Tests that the BigQueryValueCheckTrigger only fires once the query
execution reaches a successful state.
- """
- mock_job_status.return_value = "pending"
- caplog.set_level(logging.INFO)
+ @pytest.mark.asyncio
+
@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_status")
+ async def test_interval_check_trigger_pending(self, mock_job_status,
caplog, interval_check_trigger):
+ """
+ Tests that the BigQueryIntervalCheckTrigger do not fire while a query
is still running.
+ """
+ mock_job_status.return_value = "pending"
+ caplog.set_level(logging.INFO)
- trigger = BigQueryValueCheckTrigger(
- TEST_CONN_ID,
- TEST_PASS_VALUE,
- TEST_JOB_ID,
- TEST_DATASET_ID,
- TEST_GCP_PROJECT_ID,
- TEST_SQL_QUERY,
- TEST_TABLE_ID,
- TEST_TOLERANCE,
- POLLING_PERIOD_SECONDS,
- )
+ task = asyncio.create_task(interval_check_trigger.run().__anext__())
+ await asyncio.sleep(0.5)
- task = asyncio.create_task(trigger.run().__anext__())
- await asyncio.sleep(0.5)
+ # TriggerEvent was not returned
+ assert task.done() is False
- # TriggerEvent was returned
- assert task.done() is False
+ assert "Query is still running..." in caplog.text
+ assert f"Sleeping for {POLLING_PERIOD_SECONDS} seconds." in caplog.text
- assert "Query is still running..." in caplog.text
+ # Prevents error when task is destroyed while in "pending" state
+ asyncio.get_event_loop().stop()
- assert f"Sleeping for {POLLING_PERIOD_SECONDS} seconds." in caplog.text
+ @pytest.mark.asyncio
+
@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_status")
+ async def test_interval_check_trigger_terminated(self, mock_job_status,
interval_check_trigger):
+ """Tests the BigQueryIntervalCheckTrigger fires the correct event in
case of an error."""
+ # Set the status to a value other than success or pending
+ mock_job_status.return_value = "error"
- # Prevents error when task is destroyed while in "pending" state
- asyncio.get_event_loop().stop()
+ generator = interval_check_trigger.run()
+ actual = await generator.asend(None)
+ assert TriggerEvent({"status": "error", "message": "error", "data":
None}) == actual
[email protected]
-@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_status")
-async def test_bigquery_value_check_op_trigger_fail(mock_job_status):
- """
- Tests that the BigQueryValueCheckTrigger only fires once the query
execution reaches a successful state.
- """
- mock_job_status.return_value = "dummy"
+ @pytest.mark.asyncio
+
@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_status")
+ async def test_interval_check_trigger_exception(self, mock_job_status,
caplog, interval_check_trigger):
+ """Tests that the BigQueryIntervalCheckTrigger fires the correct event
in case of an error."""
+ mock_job_status.side_effect = Exception("Test exception")
+ caplog.set_level(logging.DEBUG)
- trigger = BigQueryValueCheckTrigger(
- TEST_CONN_ID,
- TEST_PASS_VALUE,
- TEST_JOB_ID,
- TEST_DATASET_ID,
- TEST_GCP_PROJECT_ID,
- TEST_SQL_QUERY,
- TEST_TABLE_ID,
- TEST_TOLERANCE,
- POLLING_PERIOD_SECONDS,
- )
+ # trigger event is yielded so it creates a generator object
+ # so i have used async for to get all the values and added it to task
+ task = [i async for i in interval_check_trigger.run()]
+ # since we use return as soon as we yield the trigger event
+ # at any given point there should be one trigger event returned to the
task
+ # so we validate for length of task to be 1
- generator = trigger.run()
- actual = await generator.asend(None)
- assert TriggerEvent({"status": "error", "message": "dummy", "records":
None}) == actual
+ assert len(task) == 1
+ assert TriggerEvent({"status": "error", "message": "Test exception"})
in task
[email protected]
-@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_status")
-async def test_bigquery_value_check_trigger_exception(mock_job_status):
- """
- Tests the BigQueryValueCheckTrigger does not fire if there is an exception.
- """
- mock_job_status.side_effect = Exception("Test exception")
+class TestBigQueryValueCheckTrigger:
+ def test_bigquery_value_check_op_trigger_serialization(self,
value_check_trigger):
+ """Asserts that the BigQueryValueCheckTrigger correctly serializes its
arguments and classpath."""
- trigger = BigQueryValueCheckTrigger(
- conn_id=TEST_CONN_ID,
- sql=TEST_SQL_QUERY,
- pass_value=TEST_PASS_VALUE,
- tolerance=1,
- job_id=TEST_JOB_ID,
- project_id=TEST_GCP_PROJECT_ID,
- )
+ classpath, kwargs = value_check_trigger.serialize()
- # trigger event is yielded so it creates a generator object
- # so i have used async for to get all the values and added it to task
- task = [i async for i in trigger.run()]
- # since we use return as soon as we yield the trigger event
- # at any given point there should be one trigger event returned to the task
- # so we validate for length of task to be 1
+ assert classpath ==
"airflow.providers.google.cloud.triggers.bigquery.BigQueryValueCheckTrigger"
+ assert kwargs == {
+ "conn_id": TEST_CONN_ID,
+ "pass_value": TEST_PASS_VALUE,
+ "job_id": TEST_JOB_ID,
+ "dataset_id": TEST_DATASET_ID,
+ "project_id": TEST_GCP_PROJECT_ID,
+ "sql": TEST_SQL_QUERY,
+ "table_id": TEST_TABLE_ID,
+ "tolerance": TEST_TOLERANCE,
+ "poll_interval": POLLING_PERIOD_SECONDS,
+ }
- assert len(task) == 1
- assert TriggerEvent({"status": "error", "message": "Test exception"}) in
task
+ @pytest.mark.asyncio
+
@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_records")
+
@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_output")
+
@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_status")
+ async def test_value_check_op_trigger_success(
+ self, mock_job_status, get_job_output, get_records, value_check_trigger
+ ):
+ """
+ Tests BigQueryValueCheckTrigger only fires once the query execution
reaches a successful state.
+ """
+ mock_job_status.return_value = "success"
+ get_job_output.return_value = {}
+ get_records.return_value = [[2], [4]]
+ asyncio.create_task(value_check_trigger.run().__anext__())
+ await asyncio.sleep(0.5)
-def test_big_query_table_existence_trigger_serialization():
- """
- Asserts that the BigQueryTableExistenceTrigger correctly serializes its
arguments
- and classpath.
- """
- trigger = BigQueryTableExistenceTrigger(
- TEST_GCP_PROJECT_ID,
- TEST_DATASET_ID,
- TEST_TABLE_ID,
- TEST_GCP_CONN_ID,
- TEST_HOOK_PARAMS,
- POLLING_PERIOD_SECONDS,
- )
- classpath, kwargs = trigger.serialize()
- assert classpath ==
"airflow.providers.google.cloud.triggers.bigquery.BigQueryTableExistenceTrigger"
- assert kwargs == {
- "dataset_id": TEST_DATASET_ID,
- "project_id": TEST_GCP_PROJECT_ID,
- "table_id": TEST_TABLE_ID,
- "gcp_conn_id": TEST_GCP_CONN_ID,
- "poll_interval": POLLING_PERIOD_SECONDS,
- "hook_params": TEST_HOOK_PARAMS,
- }
-
-
[email protected]
-@async_mock.patch(
-
"airflow.providers.google.cloud.triggers.bigquery.BigQueryTableExistenceTrigger._table_exists"
-)
-async def test_big_query_table_existence_trigger_success(mock_table_exists):
- """
- Tests success case BigQueryTableExistenceTrigger
- """
- mock_table_exists.return_value = True
+ generator = value_check_trigger.run()
+ actual = await generator.asend(None)
+ assert actual == TriggerEvent({"status": "success", "message": "Job
completed", "records": [4]})
- trigger = BigQueryTableExistenceTrigger(
- TEST_GCP_PROJECT_ID,
- TEST_DATASET_ID,
- TEST_TABLE_ID,
- TEST_GCP_CONN_ID,
- TEST_HOOK_PARAMS,
- POLLING_PERIOD_SECONDS,
- )
+ @pytest.mark.asyncio
+
@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_status")
+ async def test_value_check_op_trigger_pending(self, mock_job_status,
caplog, value_check_trigger):
+ """
+ Tests BigQueryValueCheckTrigger only fires once the query execution
reaches a successful state.
+ """
+ mock_job_status.return_value = "pending"
+ caplog.set_level(logging.INFO)
- generator = trigger.run()
- actual = await generator.asend(None)
- assert TriggerEvent({"status": "success", "message": "success"}) == actual
+ task = asyncio.create_task(value_check_trigger.run().__anext__())
+ await asyncio.sleep(0.5)
+ # TriggerEvent was returned
+ assert task.done() is False
[email protected]
-@async_mock.patch(
-
"airflow.providers.google.cloud.triggers.bigquery.BigQueryTableExistenceTrigger._table_exists"
-)
-async def test_big_query_table_existence_trigger_pending(mock_table_exists):
- """
- Test that BigQueryTableExistenceTrigger is in loop till the table exist.
- """
- mock_table_exists.return_value = False
+ assert "Query is still running..." in caplog.text
- trigger = BigQueryTableExistenceTrigger(
- TEST_GCP_PROJECT_ID,
- TEST_DATASET_ID,
- TEST_TABLE_ID,
- TEST_GCP_CONN_ID,
- TEST_HOOK_PARAMS,
- POLLING_PERIOD_SECONDS,
- )
- task = asyncio.create_task(trigger.run().__anext__())
- await asyncio.sleep(0.5)
+ assert f"Sleeping for {POLLING_PERIOD_SECONDS} seconds." in caplog.text
- # TriggerEvent was not returned
- assert task.done() is False
- asyncio.get_event_loop().stop()
+ # Prevents error when task is destroyed while in "pending" state
+ asyncio.get_event_loop().stop()
+ @pytest.mark.asyncio
+
@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_status")
+ async def test_value_check_op_trigger_fail(self, mock_job_status,
value_check_trigger):
+ """
+ Tests BigQueryValueCheckTrigger only fires once the query execution
reaches a successful state.
+ """
+ mock_job_status.return_value = "dummy"
+
+ generator = value_check_trigger.run()
+ actual = await generator.asend(None)
+ assert TriggerEvent({"status": "error", "message": "dummy", "records":
None}) == actual
+
+ @pytest.mark.asyncio
+
@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_status")
+ async def test_value_check_trigger_exception(self, mock_job_status):
+ """Tests the BigQueryValueCheckTrigger does not fire if there is an
exception."""
+ mock_job_status.side_effect = Exception("Test exception")
+
+ trigger = BigQueryValueCheckTrigger(
+ conn_id=TEST_CONN_ID,
+ sql=TEST_SQL_QUERY,
+ pass_value=TEST_PASS_VALUE,
+ tolerance=1,
+ job_id=TEST_JOB_ID,
+ project_id=TEST_GCP_PROJECT_ID,
+ )
[email protected]
-@async_mock.patch(
-
"airflow.providers.google.cloud.triggers.bigquery.BigQueryTableExistenceTrigger._table_exists"
-)
-async def test_big_query_table_existence_trigger_exception(mock_table_exists):
- """
- Test BigQueryTableExistenceTrigger throws exception if any error.
- """
- mock_table_exists.side_effect = AsyncMock(side_effect=Exception("Test
exception"))
+ # trigger event is yielded so it creates a generator object
+ # so i have used async for to get all the values and added it to task
+ task = [i async for i in trigger.run()]
+ # since we use return as soon as we yield the trigger event
+ # at any given point there should be one trigger event returned to the
task
+ # so we validate for length of task to be 1
- trigger = BigQueryTableExistenceTrigger(
- TEST_GCP_PROJECT_ID,
- TEST_DATASET_ID,
- TEST_TABLE_ID,
- TEST_GCP_CONN_ID,
- TEST_HOOK_PARAMS,
- POLLING_PERIOD_SECONDS,
- )
- task = [i async for i in trigger.run()]
- assert len(task) == 1
- assert TriggerEvent({"status": "error", "message": "Test exception"}) in
task
-
-
[email protected]
-@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryTableAsyncHook.get_table_client")
-async def test_table_exists(mock_get_table_client):
- """Test BigQueryTableExistenceTrigger._table_exists async function with
mocked value
- and mocked return value"""
- hook = BigQueryTableAsyncHook()
- mock_get_table_client.return_value = AsyncMock(Table)
- trigger = BigQueryTableExistenceTrigger(
- TEST_GCP_PROJECT_ID,
- TEST_DATASET_ID,
- TEST_TABLE_ID,
- TEST_GCP_CONN_ID,
- TEST_HOOK_PARAMS,
- POLLING_PERIOD_SECONDS,
- )
- res = await trigger._table_exists(hook, TEST_DATASET_ID, TEST_TABLE_ID,
TEST_GCP_PROJECT_ID)
- assert res is True
-
-
[email protected]
-@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryTableAsyncHook.get_table_client")
-async def test_table_exists_exception(mock_get_table_client):
- """Test BigQueryTableExistenceTrigger._table_exists async function with
exception and return False"""
- hook = BigQueryTableAsyncHook()
- mock_get_table_client.side_effect = ClientResponseError(
- history=(),
- request_info=RequestInfo(
- headers=CIMultiDict(),
- real_url=URL("https://example.com"),
- method="GET",
- url=URL("https://example.com"),
- ),
- status=404,
- message="Not Found",
- )
- trigger = BigQueryTableExistenceTrigger(
- TEST_GCP_PROJECT_ID,
- TEST_DATASET_ID,
- TEST_TABLE_ID,
- TEST_GCP_CONN_ID,
- TEST_HOOK_PARAMS,
- POLLING_PERIOD_SECONDS,
- )
- res = await trigger._table_exists(hook, TEST_DATASET_ID, TEST_TABLE_ID,
TEST_GCP_PROJECT_ID)
- expected_response = False
- assert res == expected_response
-
-
[email protected]
-@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryTableAsyncHook.get_table_client")
-async def test_table_exists_raise_exception(mock_get_table_client):
- """Test BigQueryTableExistenceTrigger._table_exists async function with
raise exception"""
- hook = BigQueryTableAsyncHook()
- mock_get_table_client.side_effect = ClientResponseError(
- history=(),
- request_info=RequestInfo(
- headers=CIMultiDict(),
- real_url=URL("https://example.com"),
- method="GET",
- url=URL("https://example.com"),
- ),
- status=400,
- message="Not Found",
- )
- trigger = BigQueryTableExistenceTrigger(
- TEST_GCP_PROJECT_ID,
- TEST_DATASET_ID,
- TEST_TABLE_ID,
- TEST_GCP_CONN_ID,
- TEST_HOOK_PARAMS,
- POLLING_PERIOD_SECONDS,
- )
- with pytest.raises(ClientResponseError):
- await trigger._table_exists(hook, TEST_DATASET_ID, TEST_TABLE_ID,
TEST_GCP_PROJECT_ID)
+ assert len(task) == 1
+ assert TriggerEvent({"status": "error", "message": "Test exception"})
in task
+
+
+class TestBigQueryTableExistenceTrigger:
+ def test_table_existence_trigger_serialization(self,
table_existence_trigger):
+ """
+ Asserts that the BigQueryTableExistenceTrigger correctly serializes
its arguments and classpath.
+ """
+
+ classpath, kwargs = table_existence_trigger.serialize()
+ assert classpath ==
"airflow.providers.google.cloud.triggers.bigquery.BigQueryTableExistenceTrigger"
+ assert kwargs == {
+ "dataset_id": TEST_DATASET_ID,
+ "project_id": TEST_GCP_PROJECT_ID,
+ "table_id": TEST_TABLE_ID,
+ "gcp_conn_id": TEST_GCP_CONN_ID,
+ "poll_interval": POLLING_PERIOD_SECONDS,
+ "hook_params": TEST_HOOK_PARAMS,
+ }
+
+ @pytest.mark.asyncio
+ @async_mock.patch(
+
"airflow.providers.google.cloud.triggers.bigquery.BigQueryTableExistenceTrigger._table_exists"
+ )
+ async def test_big_query_table_existence_trigger_success(
+ self, mock_table_exists, table_existence_trigger
+ ):
+ """Tests success case BigQueryTableExistenceTrigger"""
+ mock_table_exists.return_value = True
+
+ generator = table_existence_trigger.run()
+ actual = await generator.asend(None)
+ assert TriggerEvent({"status": "success", "message": "success"}) ==
actual
+
+ @pytest.mark.asyncio
+ @async_mock.patch(
+
"airflow.providers.google.cloud.triggers.bigquery.BigQueryTableExistenceTrigger._table_exists"
+ )
+ async def test_table_existence_trigger_pending(self, mock_table_exists,
table_existence_trigger):
+ """Test that BigQueryTableExistenceTrigger is in loop till the table
exist."""
+ mock_table_exists.return_value = False
+
+ task = asyncio.create_task(table_existence_trigger.run().__anext__())
+ await asyncio.sleep(0.5)
+
+ # TriggerEvent was not returned
+ assert task.done() is False
+ asyncio.get_event_loop().stop()
+
+ @pytest.mark.asyncio
+ @async_mock.patch(
+
"airflow.providers.google.cloud.triggers.bigquery.BigQueryTableExistenceTrigger._table_exists"
+ )
+ async def test_table_existence_trigger_exception(self, mock_table_exists,
table_existence_trigger):
+ """Test BigQueryTableExistenceTrigger throws exception if any error."""
+ mock_table_exists.side_effect = AsyncMock(side_effect=Exception("Test
exception"))
+
+ task = [i async for i in table_existence_trigger.run()]
+ assert len(task) == 1
+ assert TriggerEvent({"status": "error", "message": "Test exception"})
in task
+
+ @pytest.mark.asyncio
+
@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryTableAsyncHook.get_table_client")
+ async def test_table_exists(self, mock_get_table_client,
table_existence_trigger):
+ """Test BigQueryTableExistenceTrigger._table_exists async function
with mocked value
+ and mocked return value"""
+ hook = BigQueryTableAsyncHook()
+ mock_get_table_client.return_value = AsyncMock(Table)
+
+ res = await table_existence_trigger._table_exists(
+ hook, TEST_DATASET_ID, TEST_TABLE_ID, TEST_GCP_PROJECT_ID
+ )
+ assert res is True
+
+ @pytest.mark.asyncio
+
@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryTableAsyncHook.get_table_client")
+ async def test_table_exists_exception(self, mock_get_table_client,
table_existence_trigger):
+ """Test BigQueryTableExistenceTrigger._table_exists async function
with exception and return False"""
+ hook = BigQueryTableAsyncHook()
+ mock_get_table_client.side_effect = ClientResponseError(
+ history=(),
+ request_info=RequestInfo(
+ headers=CIMultiDict(),
+ real_url=URL("https://example.com"),
+ method="GET",
+ url=URL("https://example.com"),
+ ),
+ status=404,
+ message="Not Found",
+ )
+
+ res = await table_existence_trigger._table_exists(
+ hook, TEST_DATASET_ID, TEST_TABLE_ID, TEST_GCP_PROJECT_ID
+ )
+ expected_response = False
+ assert res == expected_response
+
+ @pytest.mark.asyncio
+
@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryTableAsyncHook.get_table_client")
+ async def test_table_exists_raise_exception(self, mock_get_table_client,
table_existence_trigger):
+ """Test BigQueryTableExistenceTrigger._table_exists async function
with raise exception"""
+ hook = BigQueryTableAsyncHook()
+ mock_get_table_client.side_effect = ClientResponseError(
+ history=(),
+ request_info=RequestInfo(
+ headers=CIMultiDict(),
+ real_url=URL("https://example.com"),
+ method="GET",
+ url=URL("https://example.com"),
+ ),
+ status=400,
+ message="Not Found",
+ )
+
+ with pytest.raises(ClientResponseError):
+ await table_existence_trigger._table_exists(
+ hook, TEST_DATASET_ID, TEST_TABLE_ID, TEST_GCP_PROJECT_ID
+ )
class TestBigQueryTablePartitionExistenceTrigger:
- def
test_big_query_table_existence_partition_trigger_serialization_should_execute_successfully(self):
+ def test_serialization_successfully(self):
"""
Asserts that the BigQueryTablePartitionExistenceTrigger correctly
serializes its arguments
and classpath.
diff --git a/tests/providers/google/cloud/triggers/test_bigquery_dts.py
b/tests/providers/google/cloud/triggers/test_bigquery_dts.py
index 46074a6a54..866e4756cf 100644
--- a/tests/providers/google/cloud/triggers/test_bigquery_dts.py
+++ b/tests/providers/google/cloud/triggers/test_bigquery_dts.py
@@ -19,18 +19,13 @@ from __future__ import annotations
import asyncio
import logging
-import sys
import pytest
from google.cloud.bigquery_datatransfer_v1 import TransferState
from airflow.providers.google.cloud.triggers.bigquery_dts import
BigQueryDataTransferRunTrigger
from airflow.triggers.base import TriggerEvent
-
-if sys.version_info < (3, 8):
- from asynctest import mock
-else:
- from unittest import mock
+from tests.providers.google.cloud.utils.compat import async_mock
PROJECT_ID = "test-project-id"
CONFIG_ID = "test-config-id"
@@ -91,9 +86,9 @@ class TestBigQueryDataTransferRunTrigger:
assert actual_value == expected_value
@pytest.mark.asyncio
-
@mock.patch(f"{TRIGGER_MODULE_PATH}.AsyncBiqQueryDataTransferServiceHook.get_transfer_run")
+
@async_mock.patch(f"{TRIGGER_MODULE_PATH}.AsyncBiqQueryDataTransferServiceHook.get_transfer_run")
async def test_run_returns_success_event(self, mock_hook, trigger):
- mock_hook.return_value = mock.MagicMock(state=TransferState.SUCCEEDED)
+ mock_hook.return_value =
async_mock.MagicMock(state=TransferState.SUCCEEDED)
expected_event = TriggerEvent(
{
"run_id": RUN_ID,
@@ -107,9 +102,9 @@ class TestBigQueryDataTransferRunTrigger:
assert actual_event == expected_event
@pytest.mark.asyncio
-
@mock.patch(f"{TRIGGER_MODULE_PATH}.AsyncBiqQueryDataTransferServiceHook.get_transfer_run")
+
@async_mock.patch(f"{TRIGGER_MODULE_PATH}.AsyncBiqQueryDataTransferServiceHook.get_transfer_run")
async def test_run_returns_failed_event(self, mock_hook, trigger):
- mock_hook.return_value = mock.MagicMock(state=TransferState.FAILED)
+ mock_hook.return_value =
async_mock.MagicMock(state=TransferState.FAILED)
expected_event = TriggerEvent(
{
"status": "failed",
@@ -122,7 +117,7 @@ class TestBigQueryDataTransferRunTrigger:
assert actual_event == expected_event
@pytest.mark.asyncio
-
@mock.patch(f"{TRIGGER_MODULE_PATH}.AsyncBiqQueryDataTransferServiceHook.get_transfer_run")
+
@async_mock.patch(f"{TRIGGER_MODULE_PATH}.AsyncBiqQueryDataTransferServiceHook.get_transfer_run")
async def test_run_returns_exception_event(self, mock_hook, trigger):
error_msg = "test error msg"
mock_hook.side_effect = Exception(error_msg)
@@ -137,9 +132,9 @@ class TestBigQueryDataTransferRunTrigger:
assert actual_event == expected_event
@pytest.mark.asyncio
-
@mock.patch(f"{TRIGGER_MODULE_PATH}.AsyncBiqQueryDataTransferServiceHook.get_transfer_run")
+
@async_mock.patch(f"{TRIGGER_MODULE_PATH}.AsyncBiqQueryDataTransferServiceHook.get_transfer_run")
async def test_run_returns_cancelled_event(self, mock_hook, trigger):
- mock_hook.return_value = mock.MagicMock(state=TransferState.CANCELLED)
+ mock_hook.return_value =
async_mock.MagicMock(state=TransferState.CANCELLED)
expected_event = TriggerEvent(
{
"status": "cancelled",
@@ -152,9 +147,9 @@ class TestBigQueryDataTransferRunTrigger:
assert actual_event == expected_event
@pytest.mark.asyncio
-
@mock.patch(f"{TRIGGER_MODULE_PATH}.AsyncBiqQueryDataTransferServiceHook.get_transfer_run")
+
@async_mock.patch(f"{TRIGGER_MODULE_PATH}.AsyncBiqQueryDataTransferServiceHook.get_transfer_run")
async def test_run_loop_is_still_running(self, mock_hook, trigger, caplog):
- mock_hook.return_value = mock.MagicMock(state=TransferState.RUNNING)
+ mock_hook.return_value =
async_mock.MagicMock(state=TransferState.RUNNING)
caplog.set_level(logging.INFO)
diff --git a/tests/providers/google/cloud/triggers/test_datafusion.py
b/tests/providers/google/cloud/triggers/test_datafusion.py
index 907cec84fd..716ebc39a7 100644
--- a/tests/providers/google/cloud/triggers/test_datafusion.py
+++ b/tests/providers/google/cloud/triggers/test_datafusion.py
@@ -18,17 +18,12 @@ from __future__ import annotations
import asyncio
import logging
-import sys
import pytest
from airflow.providers.google.cloud.triggers.datafusion import
DataFusionStartPipelineTrigger
from airflow.triggers.base import TriggerEvent
-
-if sys.version_info < (3, 8):
- from asynctest import mock
-else:
- from unittest import mock
+from tests.providers.google.cloud.utils.compat import async_mock
HOOK_STATUS_STR =
"airflow.providers.google.cloud.hooks.datafusion.DataFusionAsyncHook.get_pipeline_status"
CLASSPATH =
"airflow.providers.google.cloud.triggers.datafusion.DataFusionStartPipelineTrigger"
@@ -78,7 +73,7 @@ class TestDataFusionStartPipelineTrigger:
}
@pytest.mark.asyncio
- @mock.patch(HOOK_STATUS_STR)
+ @async_mock.patch(HOOK_STATUS_STR)
async def
test_start_pipeline_trigger_on_success_should_execute_successfully(
self, mock_pipeline_status, trigger
):
@@ -94,7 +89,7 @@ class TestDataFusionStartPipelineTrigger:
)
@pytest.mark.asyncio
- @mock.patch(HOOK_STATUS_STR)
+ @async_mock.patch(HOOK_STATUS_STR)
async def test_start_pipeline_trigger_running_should_execute_successfully(
self, mock_pipeline_status, trigger, caplog
):
@@ -117,7 +112,7 @@ class TestDataFusionStartPipelineTrigger:
asyncio.get_event_loop().stop()
@pytest.mark.asyncio
- @mock.patch(HOOK_STATUS_STR)
+ @async_mock.patch(HOOK_STATUS_STR)
async def test_start_pipeline_trigger_error_should_execute_successfully(
self, mock_pipeline_status, trigger
):
@@ -131,7 +126,7 @@ class TestDataFusionStartPipelineTrigger:
assert TriggerEvent({"status": "error", "message": "error"}) == actual
@pytest.mark.asyncio
- @mock.patch(HOOK_STATUS_STR)
+ @async_mock.patch(HOOK_STATUS_STR)
async def
test_start_pipeline_trigger_exception_should_execute_successfully(
self, mock_pipeline_status, trigger
):