This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 9692109c37 Improve Google provider trigger test (#30173)
9692109c37 is described below

commit 9692109c37e996ec31af33fbe205133bac5dc71a
Author: Pankaj Singh <[email protected]>
AuthorDate: Mon Mar 20 16:43:03 2023 +0530

    Improve Google provider trigger test (#30173)
---
 .../google/cloud/triggers/test_bigquery.py         | 1445 ++++++++------------
 .../google/cloud/triggers/test_bigquery_dts.py     |   25 +-
 .../google/cloud/triggers/test_datafusion.py       |   15 +-
 3 files changed, 591 insertions(+), 894 deletions(-)

diff --git a/tests/providers/google/cloud/triggers/test_bigquery.py 
b/tests/providers/google/cloud/triggers/test_bigquery.py
index f4a7773b87..410aa14b1a 100644
--- a/tests/providers/google/cloud/triggers/test_bigquery.py
+++ b/tests/providers/google/cloud/triggers/test_bigquery.py
@@ -22,7 +22,7 @@ from typing import Any
 
 import pytest
 from aiohttp import ClientResponseError, RequestInfo
-from gcloud.aio.bigquery import Table
+from gcloud.aio.bigquery import Job, Table
 from multidict import CIMultiDict
 from yarl import URL
 
@@ -41,9 +41,6 @@ from tests.providers.google.cloud.utils.compat import 
AsyncMock, async_mock
 
 TEST_CONN_ID = "bq_default"
 TEST_JOB_ID = "1234"
-RUN_ID = "1"
-RETRY_LIMIT = 2
-RETRY_DELAY = 1.0
 TEST_GCP_PROJECT_ID = "test-project"
 TEST_DATASET_ID = "bq_dataset"
 TEST_TABLE_ID = "bq_table"
@@ -63,68 +60,9 @@ TEST_HOOK_PARAMS: dict[str, Any] = {}
 TEST_PARTITION_ID = "1234"
 
 
-def test_bigquery_insert_job_op_trigger_serialization():
-    """
-    Asserts that the BigQueryInsertJobTrigger correctly serializes its 
arguments
-    and classpath.
-    """
-    trigger = BigQueryInsertJobTrigger(
-        TEST_CONN_ID,
-        TEST_JOB_ID,
-        TEST_GCP_PROJECT_ID,
-        TEST_DATASET_ID,
-        TEST_TABLE_ID,
-        POLLING_PERIOD_SECONDS,
-    )
-    classpath, kwargs = trigger.serialize()
-    assert classpath == 
"airflow.providers.google.cloud.triggers.bigquery.BigQueryInsertJobTrigger"
-    assert kwargs == {
-        "conn_id": TEST_CONN_ID,
-        "job_id": TEST_JOB_ID,
-        "project_id": TEST_GCP_PROJECT_ID,
-        "dataset_id": TEST_DATASET_ID,
-        "table_id": TEST_TABLE_ID,
-        "poll_interval": POLLING_PERIOD_SECONDS,
-    }
-
-
[email protected]
-@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_status")
-async def test_bigquery_insert_job_op_trigger_success(mock_job_status):
-    """
-    Tests the BigQueryInsertJobTrigger only fires once the query execution 
reaches a successful state.
-    """
-    mock_job_status.return_value = "success"
-
-    trigger = BigQueryInsertJobTrigger(
-        TEST_CONN_ID,
-        TEST_JOB_ID,
-        TEST_GCP_PROJECT_ID,
-        TEST_DATASET_ID,
-        TEST_TABLE_ID,
-        POLLING_PERIOD_SECONDS,
-    )
-
-    generator = trigger.run()
-    actual = await generator.asend(None)
-    assert TriggerEvent({"status": "success", "message": "Job completed", 
"job_id": TEST_JOB_ID}) == actual
-
-
[email protected]
-@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_instance")
-async def test_bigquery_insert_job_trigger_running(mock_job_instance, caplog):
-    """
-    Test that BigQuery Triggers do not fire while a query is still running.
-    """
-
-    from gcloud.aio.bigquery import Job
-
-    mock_job_client = AsyncMock(Job)
-    mock_job_instance.return_value = mock_job_client
-    mock_job_instance.return_value.result.side_effect = OSError
-    caplog.set_level(logging.INFO)
-
-    trigger = BigQueryInsertJobTrigger(
[email protected]
+def insert_job_trigger():
+    return BigQueryInsertJobTrigger(
         conn_id=TEST_CONN_ID,
         job_id=TEST_JOB_ID,
         project_id=TEST_GCP_PROJECT_ID,
@@ -132,36 +70,11 @@ async def 
test_bigquery_insert_job_trigger_running(mock_job_instance, caplog):
         table_id=TEST_TABLE_ID,
         poll_interval=POLLING_PERIOD_SECONDS,
     )
-    task = asyncio.create_task(trigger.run().__anext__())
-    await asyncio.sleep(0.5)
-
-    # TriggerEvent was not returned
-    assert task.done() is False
-
-    assert f"Using the connection  {TEST_CONN_ID} ." in caplog.text
-
-    assert "Query is still running..." in caplog.text
-    assert f"Sleeping for {POLLING_PERIOD_SECONDS} seconds." in caplog.text
-
-    # Prevents error when task is destroyed while in "pending" state
-    asyncio.get_event_loop().stop()
-
-
[email protected]
-@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_instance")
-async def test_bigquery_get_data_trigger_running(mock_job_instance, caplog):
-    """
-    Test that BigQuery Triggers do not fire while a query is still running.
-    """
 
-    from gcloud.aio.bigquery import Job
 
-    mock_job_client = AsyncMock(Job)
-    mock_job_instance.return_value = mock_job_client
-    mock_job_instance.return_value.result.side_effect = OSError
-    caplog.set_level(logging.INFO)
-
-    trigger = BigQueryGetDataTrigger(
[email protected]
+def get_data_trigger():
+    return BigQueryGetDataTrigger(
         conn_id=TEST_CONN_ID,
         job_id=TEST_JOB_ID,
         project_id=TEST_GCP_PROJECT_ID,
@@ -169,69 +82,42 @@ async def 
test_bigquery_get_data_trigger_running(mock_job_instance, caplog):
         table_id=TEST_TABLE_ID,
         poll_interval=POLLING_PERIOD_SECONDS,
     )
-    task = asyncio.create_task(trigger.run().__anext__())
-    await asyncio.sleep(0.5)
-
-    # TriggerEvent was not returned
-    assert task.done() is False
-
-    assert f"Using the connection  {TEST_CONN_ID} ." in caplog.text
-
-    assert "Query is still running..." in caplog.text
-    assert f"Sleeping for {POLLING_PERIOD_SECONDS} seconds." in caplog.text
-
-    # Prevents error when task is destroyed while in "pending" state
-    asyncio.get_event_loop().stop()
-
 
[email protected]
-@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_instance")
-async def test_bigquery_check_trigger_running(mock_job_instance, caplog):
-    """
-    Test that BigQuery Triggers do not fire while a query is still running.
-    """
 
-    from gcloud.aio.bigquery import Job
[email protected]
+def table_existence_trigger():
+    return BigQueryTableExistenceTrigger(
+        TEST_GCP_PROJECT_ID,
+        TEST_DATASET_ID,
+        TEST_TABLE_ID,
+        TEST_GCP_CONN_ID,
+        TEST_HOOK_PARAMS,
+        POLLING_PERIOD_SECONDS,
+    )
 
-    mock_job_client = AsyncMock(Job)
-    mock_job_instance.return_value = mock_job_client
-    mock_job_instance.return_value.result.side_effect = OSError
-    caplog.set_level(logging.INFO)
 
-    trigger = BigQueryCheckTrigger(
[email protected]
+def interval_check_trigger():
+    return BigQueryIntervalCheckTrigger(
         conn_id=TEST_CONN_ID,
-        job_id=TEST_JOB_ID,
+        first_job_id=TEST_FIRST_JOB_ID,
+        second_job_id=TEST_SECOND_JOB_ID,
         project_id=TEST_GCP_PROJECT_ID,
+        table=TEST_TABLE_ID,
+        metrics_thresholds=TEST_METRIC_THRESHOLDS,
+        date_filter_column=TEST_DATE_FILTER_COLUMN,
+        days_back=TEST_DAYS_BACK,
+        ratio_formula=TEST_RATIO_FORMULA,
+        ignore_zero=TEST_IGNORE_ZERO,
         dataset_id=TEST_DATASET_ID,
         table_id=TEST_TABLE_ID,
         poll_interval=POLLING_PERIOD_SECONDS,
     )
-    task = asyncio.create_task(trigger.run().__anext__())
-    await asyncio.sleep(0.5)
 
-    # TriggerEvent was not returned
-    assert task.done() is False
 
-    assert f"Using the connection  {TEST_CONN_ID} ." in caplog.text
-
-    assert "Query is still running..." in caplog.text
-    assert f"Sleeping for {POLLING_PERIOD_SECONDS} seconds." in caplog.text
-
-    # Prevents error when task is destroyed while in "pending" state
-    asyncio.get_event_loop().stop()
-
-
[email protected]
-@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_status")
-async def test_bigquery_op_trigger_terminated(mock_job_status, caplog):
-    """
-    Test that BigQuery Triggers fire the correct event in case of an error.
-    """
-    # Set the status to a value other than success or pending
-
-    mock_job_status.return_value = "error"
-
-    trigger = BigQueryInsertJobTrigger(
[email protected]
+def check_trigger():
+    return BigQueryCheckTrigger(
         conn_id=TEST_CONN_ID,
         job_id=TEST_JOB_ID,
         project_id=TEST_GCP_PROJECT_ID,
@@ -240,815 +126,636 @@ async def 
test_bigquery_op_trigger_terminated(mock_job_status, caplog):
         poll_interval=POLLING_PERIOD_SECONDS,
     )
 
-    generator = trigger.run()
-    actual = await generator.asend(None)
-    assert TriggerEvent({"status": "error", "message": "error"}) == actual
 
-
[email protected]
-@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_status")
-async def test_bigquery_check_trigger_terminated(mock_job_status, caplog):
-    """
-    Test that BigQuery Triggers fire the correct event in case of an error.
-    """
-    # Set the status to a value other than success or pending
-
-    mock_job_status.return_value = "error"
-
-    trigger = BigQueryCheckTrigger(
[email protected]
+def value_check_trigger():
+    return BigQueryValueCheckTrigger(
         conn_id=TEST_CONN_ID,
+        pass_value=TEST_PASS_VALUE,
         job_id=TEST_JOB_ID,
-        project_id=TEST_GCP_PROJECT_ID,
         dataset_id=TEST_DATASET_ID,
+        project_id=TEST_GCP_PROJECT_ID,
+        sql=TEST_SQL_QUERY,
         table_id=TEST_TABLE_ID,
+        tolerance=TEST_TOLERANCE,
         poll_interval=POLLING_PERIOD_SECONDS,
     )
 
-    generator = trigger.run()
-    actual = await generator.asend(None)
-    assert TriggerEvent({"status": "error", "message": "error"}) == actual
 
+class TestBigQueryInsertJobTrigger:
+    def test_serialization(self, insert_job_trigger):
+        """
+        Asserts that the BigQueryInsertJobTrigger correctly serializes its 
arguments and classpath.
+        """
+        classpath, kwargs = insert_job_trigger.serialize()
+        assert classpath == 
"airflow.providers.google.cloud.triggers.bigquery.BigQueryInsertJobTrigger"
+        assert kwargs == {
+            "conn_id": TEST_CONN_ID,
+            "job_id": TEST_JOB_ID,
+            "project_id": TEST_GCP_PROJECT_ID,
+            "dataset_id": TEST_DATASET_ID,
+            "table_id": TEST_TABLE_ID,
+            "poll_interval": POLLING_PERIOD_SECONDS,
+        }
 
[email protected]
-@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_status")
-async def test_bigquery_get_data_trigger_terminated(mock_job_status, caplog):
-    """
-    Test that BigQuery Triggers fire the correct event in case of an error.
-    """
-    # Set the status to a value other than success or pending
+    @pytest.mark.asyncio
+    
@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_status")
+    async def test_bigquery_insert_job_op_trigger_success(self, 
mock_job_status, insert_job_trigger):
+        """
+        Tests the BigQueryInsertJobTrigger only fires once the query execution 
reaches a successful state.
+        """
+        mock_job_status.return_value = "success"
 
-    mock_job_status.return_value = "error"
+        generator = insert_job_trigger.run()
+        actual = await generator.asend(None)
+        assert (
+            TriggerEvent({"status": "success", "message": "Job completed", 
"job_id": TEST_JOB_ID}) == actual
+        )
 
-    trigger = BigQueryGetDataTrigger(
-        conn_id=TEST_CONN_ID,
-        job_id=TEST_JOB_ID,
-        project_id=TEST_GCP_PROJECT_ID,
-        dataset_id=TEST_DATASET_ID,
-        table_id=TEST_TABLE_ID,
-        poll_interval=POLLING_PERIOD_SECONDS,
-    )
+    @pytest.mark.asyncio
+    
@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_instance")
+    async def test_bigquery_insert_job_trigger_running(self, 
mock_job_instance, caplog, insert_job_trigger):
+        """Test that BigQuery Triggers do not fire while a query is still 
running."""
 
-    generator = trigger.run()
-    actual = await generator.asend(None)
-    assert TriggerEvent({"status": "error", "message": "error"}) == actual
+        mock_job_client = AsyncMock(Job)
+        mock_job_instance.return_value = mock_job_client
+        mock_job_instance.return_value.result.side_effect = OSError
+        caplog.set_level(logging.INFO)
 
+        task = asyncio.create_task(insert_job_trigger.run().__anext__())
+        await asyncio.sleep(0.5)
 
[email protected]
-@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_status")
-async def test_bigquery_op_trigger_exception(mock_job_status, caplog):
-    """
-    Test that BigQuery Triggers fire the correct event in case of an error.
-    """
-    mock_job_status.side_effect = Exception("Test exception")
+        # TriggerEvent was not returned
+        assert task.done() is False
 
-    trigger = BigQueryInsertJobTrigger(
-        conn_id=TEST_CONN_ID,
-        job_id=TEST_JOB_ID,
-        project_id=TEST_GCP_PROJECT_ID,
-        dataset_id=TEST_DATASET_ID,
-        table_id=TEST_TABLE_ID,
-        poll_interval=POLLING_PERIOD_SECONDS,
-    )
+        assert "Query is still running..." in caplog.text
+        assert f"Sleeping for {POLLING_PERIOD_SECONDS} seconds." in caplog.text
 
-    generator = trigger.run()
-    actual = await generator.asend(None)
-    assert TriggerEvent({"status": "error", "message": "Test exception"}) == 
actual
+        # Prevents error when task is destroyed while in "pending" state
+        asyncio.get_event_loop().stop()
 
+    @pytest.mark.asyncio
+    
@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_status")
+    async def test_bigquery_op_trigger_terminated(self, mock_job_status, 
caplog, insert_job_trigger):
+        """Test that BigQuery Triggers fire the correct event in case of an 
error."""
+        # Set the status to a value other than success or pending
 
[email protected]
-@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_status")
-async def test_bigquery_check_trigger_exception(mock_job_status, caplog):
-    """
-    Test that BigQuery Triggers fire the correct event in case of an error.
-    """
-    mock_job_status.side_effect = Exception("Test exception")
+        mock_job_status.return_value = "error"
 
-    trigger = BigQueryCheckTrigger(
-        conn_id=TEST_CONN_ID,
-        job_id=TEST_JOB_ID,
-        project_id=TEST_GCP_PROJECT_ID,
-        dataset_id=TEST_DATASET_ID,
-        table_id=TEST_TABLE_ID,
-        poll_interval=POLLING_PERIOD_SECONDS,
-    )
+        generator = insert_job_trigger.run()
+        actual = await generator.asend(None)
+        assert TriggerEvent({"status": "error", "message": "error"}) == actual
 
-    generator = trigger.run()
-    actual = await generator.asend(None)
-    assert TriggerEvent({"status": "error", "message": "Test exception"}) == 
actual
+    @pytest.mark.asyncio
+    
@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_status")
+    async def test_bigquery_op_trigger_exception(self, mock_job_status, 
caplog, insert_job_trigger):
+        """Test that BigQuery Triggers fire the correct event in case of an 
error."""
+        mock_job_status.side_effect = Exception("Test exception")
 
+        generator = insert_job_trigger.run()
+        actual = await generator.asend(None)
+        assert TriggerEvent({"status": "error", "message": "Test exception"}) 
== actual
 
[email protected]
-@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_status")
-async def test_bigquery_get_data_trigger_exception(mock_job_status, caplog):
-    """
-    Test that BigQuery Triggers fire the correct event in case of an error.
-    """
-    mock_job_status.side_effect = Exception("Test exception")
 
-    trigger = BigQueryGetDataTrigger(
-        conn_id=TEST_CONN_ID,
-        job_id=TEST_JOB_ID,
-        project_id=TEST_GCP_PROJECT_ID,
-        dataset_id=TEST_DATASET_ID,
-        table_id=TEST_TABLE_ID,
-        poll_interval=POLLING_PERIOD_SECONDS,
-    )
+class TestBigQueryGetDataTrigger:
+    def test_bigquery_get_data_trigger_serialization(self, get_data_trigger):
+        """Asserts that the BigQueryGetDataTrigger correctly serializes its 
arguments and classpath."""
 
-    generator = trigger.run()
-    actual = await generator.asend(None)
-    assert TriggerEvent({"status": "error", "message": "Test exception"}) == 
actual
+        classpath, kwargs = get_data_trigger.serialize()
+        assert classpath == 
"airflow.providers.google.cloud.triggers.bigquery.BigQueryGetDataTrigger"
+        assert kwargs == {
+            "conn_id": TEST_CONN_ID,
+            "job_id": TEST_JOB_ID,
+            "dataset_id": TEST_DATASET_ID,
+            "project_id": TEST_GCP_PROJECT_ID,
+            "table_id": TEST_TABLE_ID,
+            "poll_interval": POLLING_PERIOD_SECONDS,
+        }
 
+    @pytest.mark.asyncio
+    
@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_instance")
+    async def test_bigquery_get_data_trigger_running(self, mock_job_instance, 
caplog, get_data_trigger):
+        """Test that BigQuery Triggers do not fire while a query is still 
running."""
 
-def test_bigquery_check_op_trigger_serialization():
-    """
-    Asserts that the BigQueryCheckTrigger correctly serializes its arguments
-    and classpath.
-    """
-    trigger = BigQueryCheckTrigger(
-        TEST_CONN_ID,
-        TEST_JOB_ID,
-        TEST_GCP_PROJECT_ID,
-        TEST_DATASET_ID,
-        TEST_TABLE_ID,
-        POLLING_PERIOD_SECONDS,
-    )
-    classpath, kwargs = trigger.serialize()
-    assert classpath == 
"airflow.providers.google.cloud.triggers.bigquery.BigQueryCheckTrigger"
-    assert kwargs == {
-        "conn_id": TEST_CONN_ID,
-        "job_id": TEST_JOB_ID,
-        "dataset_id": TEST_DATASET_ID,
-        "project_id": TEST_GCP_PROJECT_ID,
-        "table_id": TEST_TABLE_ID,
-        "poll_interval": POLLING_PERIOD_SECONDS,
-    }
-
-
[email protected]
-@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_status")
-@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_output")
-async def test_bigquery_check_op_trigger_success_with_data(mock_job_output, 
mock_job_status):
-    """
-    Test the BigQueryCheckTrigger only fires once the query execution reaches 
a successful state.
-    """
-    mock_job_status.return_value = "success"
-    mock_job_output.return_value = {
-        "kind": "bigquery#getQueryResultsResponse",
-        "etag": "test_etag",
-        "schema": {"fields": [{"name": "f0_", "type": "INTEGER", "mode": 
"NULLABLE"}]},
-        "jobReference": {
-            "projectId": "test_airflow-providers",
-            "jobId": "test_jobid",
-            "location": "US",
-        },
-        "totalRows": "1",
-        "rows": [{"f": [{"v": "22"}]}],
-        "totalBytesProcessed": "0",
-        "jobComplete": True,
-        "cacheHit": False,
-    }
-
-    trigger = BigQueryCheckTrigger(
-        TEST_CONN_ID,
-        TEST_JOB_ID,
-        TEST_GCP_PROJECT_ID,
-        TEST_DATASET_ID,
-        TEST_TABLE_ID,
-        POLLING_PERIOD_SECONDS,
-    )
+        mock_job_client = AsyncMock(Job)
+        mock_job_instance.return_value = mock_job_client
+        mock_job_instance.return_value.result.side_effect = OSError
+        caplog.set_level(logging.INFO)
 
-    generator = trigger.run()
-    actual = await generator.asend(None)
-
-    assert TriggerEvent({"status": "success", "records": [22]}) == actual
-
-
[email protected]
-@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_status")
-@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_output")
-async def test_bigquery_check_op_trigger_success_without_data(mock_job_output, 
mock_job_status):
-    """
-    Tests that BigQueryCheckTrigger sends TriggerEvent as  { "status": 
"success", "records": None}
-    when no rows are available in the query result.
-    """
-    mock_job_status.return_value = "success"
-    mock_job_output.return_value = {
-        "kind": "bigquery#getQueryResultsResponse",
-        "etag": "test_etag",
-        "schema": {
-            "fields": [
-                {"name": "value", "type": "INTEGER", "mode": "NULLABLE"},
-                {"name": "name", "type": "STRING", "mode": "NULLABLE"},
-                {"name": "ds", "type": "DATE", "mode": "NULLABLE"},
-            ]
-        },
-        "jobReference": {
-            "projectId": "test_airflow-airflow-providers",
-            "jobId": "test_jobid",
-            "location": "US",
-        },
-        "totalRows": "0",
-        "totalBytesProcessed": "0",
-        "jobComplete": True,
-        "cacheHit": False,
-    }
-
-    trigger = BigQueryCheckTrigger(
-        TEST_CONN_ID,
-        TEST_JOB_ID,
-        TEST_GCP_PROJECT_ID,
-        TEST_DATASET_ID,
-        TEST_TABLE_ID,
-        POLLING_PERIOD_SECONDS,
-    )
-    generator = trigger.run()
-    actual = await generator.asend(None)
-    assert TriggerEvent({"status": "success", "records": None}) == actual
+        task = asyncio.create_task(get_data_trigger.run().__anext__())
+        await asyncio.sleep(0.5)
 
+        # TriggerEvent was not returned
+        assert task.done() is False
 
-def test_bigquery_get_data_trigger_serialization():
-    """
-    Asserts that the BigQueryGetDataTrigger correctly serializes its arguments
-    and classpath.
-    """
-    trigger = BigQueryGetDataTrigger(
-        conn_id=TEST_CONN_ID,
-        job_id=TEST_JOB_ID,
-        project_id=TEST_GCP_PROJECT_ID,
-        dataset_id=TEST_DATASET_ID,
-        table_id=TEST_TABLE_ID,
-        poll_interval=POLLING_PERIOD_SECONDS,
-    )
-    classpath, kwargs = trigger.serialize()
-    assert classpath == 
"airflow.providers.google.cloud.triggers.bigquery.BigQueryGetDataTrigger"
-    assert kwargs == {
-        "conn_id": TEST_CONN_ID,
-        "job_id": TEST_JOB_ID,
-        "dataset_id": TEST_DATASET_ID,
-        "project_id": TEST_GCP_PROJECT_ID,
-        "table_id": TEST_TABLE_ID,
-        "poll_interval": POLLING_PERIOD_SECONDS,
-    }
-
-
[email protected]
-@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_status")
-@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_output")
-async def test_bigquery_get_data_trigger_success_with_data(mock_job_output, 
mock_job_status):
-    """
-    Tests that BigQueryGetDataTrigger only fires once the query execution 
reaches a successful state.
-    """
-    mock_job_status.return_value = "success"
-    mock_job_output.return_value = {
-        "kind": "bigquery#tableDataList",
-        "etag": "test_etag",
-        "schema": {
-            "fields": [
-                {"name": "f0_", "type": "INTEGER", "mode": "NULLABLE"},
-                {"name": "f1_", "type": "STRING", "mode": "NULLABLE"},
-            ]
-        },
-        "jobReference": {
-            "projectId": "test-airflow-providers",
-            "jobId": "test_jobid",
-            "location": "US",
-        },
-        "totalRows": "10",
-        "rows": [{"f": [{"v": "42"}, {"v": "monthy python"}]}, {"f": [{"v": 
"42"}, {"v": "fishy fish"}]}],
-        "totalBytesProcessed": "0",
-        "jobComplete": True,
-        "cacheHit": False,
-    }
-
-    trigger = BigQueryGetDataTrigger(
-        TEST_CONN_ID,
-        TEST_JOB_ID,
-        TEST_GCP_PROJECT_ID,
-        TEST_DATASET_ID,
-        TEST_TABLE_ID,
-        POLLING_PERIOD_SECONDS,
-    )
+        assert "Query is still running..." in caplog.text
+        assert f"Sleeping for {POLLING_PERIOD_SECONDS} seconds." in caplog.text
+
+        # Prevents error when task is destroyed while in "pending" state
+        asyncio.get_event_loop().stop()
 
-    generator = trigger.run()
-    actual = await generator.asend(None)
-    # # The extracted row will be parsed and formatted to retrieve the value 
from the
-    # # structure - 'rows":[{"f":[{"v":"42"},{"v":"monthy 
python"}]},{"f":[{"v":"42"},{"v":"fishy fish"}]}]
-
-    assert (
-        TriggerEvent(
-            {
-                "status": "success",
-                "message": "success",
-                "records": [[42, "monthy python"], [42, "fishy fish"]],
-            }
+    @pytest.mark.asyncio
+    
@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_status")
+    async def test_bigquery_get_data_trigger_terminated(self, mock_job_status, 
caplog, get_data_trigger):
+        """Test that BigQuery Triggers fire the correct event in case of an 
error."""
+        # Set the status to a value other than success or pending
+
+        mock_job_status.return_value = "error"
+
+        generator = get_data_trigger.run()
+        actual = await generator.asend(None)
+        assert TriggerEvent({"status": "error", "message": "error"}) == actual
+
+    @pytest.mark.asyncio
+    
@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_status")
+    async def test_bigquery_get_data_trigger_exception(self, mock_job_status, 
caplog, get_data_trigger):
+        """Test that BigQuery Triggers fire the correct event in case of an 
error."""
+        mock_job_status.side_effect = Exception("Test exception")
+
+        generator = get_data_trigger.run()
+        actual = await generator.asend(None)
+        assert TriggerEvent({"status": "error", "message": "Test exception"}) 
== actual
+
+    @pytest.mark.asyncio
+    
@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_status")
+    
@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_output")
+    async def test_bigquery_get_data_trigger_success_with_data(
+        self, mock_job_output, mock_job_status, get_data_trigger
+    ):
+        """
+        Tests that BigQueryGetDataTrigger only fires once the query execution 
reaches a successful state.
+        """
+        mock_job_status.return_value = "success"
+        mock_job_output.return_value = {
+            "kind": "bigquery#tableDataList",
+            "etag": "test_etag",
+            "schema": {
+                "fields": [
+                    {"name": "f0_", "type": "INTEGER", "mode": "NULLABLE"},
+                    {"name": "f1_", "type": "STRING", "mode": "NULLABLE"},
+                ]
+            },
+            "jobReference": {
+                "projectId": "test-airflow-providers",
+                "jobId": "test_jobid",
+                "location": "US",
+            },
+            "totalRows": "10",
+            "rows": [{"f": [{"v": "42"}, {"v": "monthy python"}]}, {"f": 
[{"v": "42"}, {"v": "fishy fish"}]}],
+            "totalBytesProcessed": "0",
+            "jobComplete": True,
+            "cacheHit": False,
+        }
+
+        generator = get_data_trigger.run()
+        actual = await generator.asend(None)
+        # The extracted row will be parsed and formatted to retrieve the value 
from the
+        # structure - 'rows":[{"f":[{"v":"42"},{"v":"monthy 
python"}]},{"f":[{"v":"42"},{"v":"fishy fish"}]}]
+
+        assert (
+            TriggerEvent(
+                {
+                    "status": "success",
+                    "message": "success",
+                    "records": [[42, "monthy python"], [42, "fishy fish"]],
+                }
+            )
+            == actual
         )
-        == actual
-    )
-    # Prevents error when task is destroyed while in "pending" state
-    asyncio.get_event_loop().stop()
-
-
-def test_bigquery_interval_check_trigger_serialization():
-    """
-    Asserts that the BigQueryIntervalCheckTrigger correctly serializes its 
arguments
-    and classpath.
-    """
-    trigger = BigQueryIntervalCheckTrigger(
-        TEST_CONN_ID,
-        TEST_FIRST_JOB_ID,
-        TEST_SECOND_JOB_ID,
-        TEST_GCP_PROJECT_ID,
-        TEST_TABLE_ID,
-        TEST_METRIC_THRESHOLDS,
-        TEST_DATE_FILTER_COLUMN,
-        TEST_DAYS_BACK,
-        TEST_RATIO_FORMULA,
-        TEST_IGNORE_ZERO,
-        TEST_DATASET_ID,
-        TEST_TABLE_ID,
-        POLLING_PERIOD_SECONDS,
-    )
-    classpath, kwargs = trigger.serialize()
-    assert classpath == 
"airflow.providers.google.cloud.triggers.bigquery.BigQueryIntervalCheckTrigger"
-    assert kwargs == {
-        "conn_id": TEST_CONN_ID,
-        "first_job_id": TEST_FIRST_JOB_ID,
-        "second_job_id": TEST_SECOND_JOB_ID,
-        "project_id": TEST_GCP_PROJECT_ID,
-        "table": TEST_TABLE_ID,
-        "metrics_thresholds": TEST_METRIC_THRESHOLDS,
-        "date_filter_column": TEST_DATE_FILTER_COLUMN,
-        "days_back": TEST_DAYS_BACK,
-        "ratio_formula": TEST_RATIO_FORMULA,
-        "ignore_zero": TEST_IGNORE_ZERO,
-    }
-
-
[email protected]
-@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_status")
-@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_output")
-async def test_bigquery_interval_check_trigger_success(mock_get_job_output, 
mock_job_status):
-    """
-    Tests the BigQueryIntervalCheckTrigger only fires once the query execution 
reaches a successful state.
-    """
-    mock_job_status.return_value = "success"
-    mock_get_job_output.return_value = ["0"]
-
-    trigger = BigQueryIntervalCheckTrigger(
-        conn_id=TEST_CONN_ID,
-        first_job_id=TEST_FIRST_JOB_ID,
-        second_job_id=TEST_SECOND_JOB_ID,
-        project_id=TEST_GCP_PROJECT_ID,
-        table=TEST_TABLE_ID,
-        metrics_thresholds=TEST_METRIC_THRESHOLDS,
-        date_filter_column=TEST_DATE_FILTER_COLUMN,
-        days_back=TEST_DAYS_BACK,
-        ratio_formula=TEST_RATIO_FORMULA,
-        ignore_zero=TEST_IGNORE_ZERO,
-        dataset_id=TEST_DATASET_ID,
-        table_id=TEST_TABLE_ID,
-        poll_interval=POLLING_PERIOD_SECONDS,
-    )
+        # Prevents error when task is destroyed while in "pending" state
+        asyncio.get_event_loop().stop()
 
-    generator = trigger.run()
-    actual = await generator.asend(None)
-    assert actual == TriggerEvent({"status": "error", "message": "The second 
SQL query returned None"})
 
+class TestBigQueryCheckTrigger:
+    @pytest.mark.asyncio
+    
@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_instance")
+    async def test_bigquery_check_trigger_running(self, mock_job_instance, 
caplog, check_trigger):
+        """Test that BigQuery Triggers do not fire while a query is still 
running."""
 
[email protected]
-@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_status")
-async def test_bigquery_interval_check_trigger_pending(mock_job_status, 
caplog):
-    """
-    Tests that the BigQueryIntervalCheckTrigger do not fire while a query is 
still running.
-    """
-    mock_job_status.return_value = "pending"
-    caplog.set_level(logging.INFO)
+        mock_job_client = AsyncMock(Job)
+        mock_job_instance.return_value = mock_job_client
+        mock_job_instance.return_value.result.side_effect = OSError
+        caplog.set_level(logging.INFO)
 
-    trigger = BigQueryIntervalCheckTrigger(
-        conn_id=TEST_CONN_ID,
-        first_job_id=TEST_FIRST_JOB_ID,
-        second_job_id=TEST_SECOND_JOB_ID,
-        project_id=TEST_GCP_PROJECT_ID,
-        table=TEST_TABLE_ID,
-        metrics_thresholds=TEST_METRIC_THRESHOLDS,
-        date_filter_column=TEST_DATE_FILTER_COLUMN,
-        days_back=TEST_DAYS_BACK,
-        ratio_formula=TEST_RATIO_FORMULA,
-        ignore_zero=TEST_IGNORE_ZERO,
-        dataset_id=TEST_DATASET_ID,
-        table_id=TEST_TABLE_ID,
-        poll_interval=POLLING_PERIOD_SECONDS,
-    )
-    task = asyncio.create_task(trigger.run().__anext__())
-    await asyncio.sleep(0.5)
+        task = asyncio.create_task(check_trigger.run().__anext__())
+        await asyncio.sleep(0.5)
 
-    # TriggerEvent was not returned
-    assert task.done() is False
+        # TriggerEvent was not returned
+        assert task.done() is False
 
-    assert f"Using the connection  {TEST_CONN_ID} ." in caplog.text
+        assert "Query is still running..." in caplog.text
+        assert f"Sleeping for {POLLING_PERIOD_SECONDS} seconds." in caplog.text
 
-    assert "Query is still running..." in caplog.text
-    assert f"Sleeping for {POLLING_PERIOD_SECONDS} seconds." in caplog.text
+        # Prevents error when task is destroyed while in "pending" state
+        asyncio.get_event_loop().stop()
 
-    # Prevents error when task is destroyed while in "pending" state
-    asyncio.get_event_loop().stop()
+    @pytest.mark.asyncio
+    
@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_status")
+    async def test_bigquery_check_trigger_terminated(self, mock_job_status, 
caplog, check_trigger):
+        """Test that BigQuery Triggers fire the correct event in case of an 
error."""
+        # Set the status to a value other than success or pending
 
+        mock_job_status.return_value = "error"
 
[email protected]
-@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_status")
-async def test_bigquery_interval_check_trigger_terminated(mock_job_status):
-    """
-    Tests the BigQueryIntervalCheckTrigger fires the correct event in case of 
an error.
-    """
-    # Set the status to a value other than success or pending
-    mock_job_status.return_value = "error"
-    trigger = BigQueryIntervalCheckTrigger(
-        conn_id=TEST_CONN_ID,
-        first_job_id=TEST_FIRST_JOB_ID,
-        second_job_id=TEST_SECOND_JOB_ID,
-        project_id=TEST_GCP_PROJECT_ID,
-        table=TEST_TABLE_ID,
-        metrics_thresholds=TEST_METRIC_THRESHOLDS,
-        date_filter_column=TEST_DATE_FILTER_COLUMN,
-        days_back=TEST_DAYS_BACK,
-        ratio_formula=TEST_RATIO_FORMULA,
-        ignore_zero=TEST_IGNORE_ZERO,
-        dataset_id=TEST_DATASET_ID,
-        table_id=TEST_TABLE_ID,
-        poll_interval=POLLING_PERIOD_SECONDS,
-    )
+        generator = check_trigger.run()
+        actual = await generator.asend(None)
+        assert TriggerEvent({"status": "error", "message": "error"}) == actual
 
-    generator = trigger.run()
-    actual = await generator.asend(None)
+    @pytest.mark.asyncio
+    
@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_status")
+    async def test_check_trigger_exception(self, mock_job_status, caplog, 
check_trigger):
+        """Test that BigQuery Triggers fire the correct event in case of an 
error."""
+        mock_job_status.side_effect = Exception("Test exception")
 
-    assert TriggerEvent({"status": "error", "message": "error", "data": None}) 
== actual
+        generator = check_trigger.run()
+        actual = await generator.asend(None)
+        assert TriggerEvent({"status": "error", "message": "Test exception"}) 
== actual
 
+    def test_check_trigger_serialization(self, check_trigger):
+        """Asserts that the BigQueryCheckTrigger correctly serializes its 
arguments and classpath."""
 
[email protected]
-@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_status")
-async def test_bigquery_interval_check_trigger_exception(mock_job_status, 
caplog):
-    """
-    Tests that the BigQueryIntervalCheckTrigger fires the correct event in 
case of an error.
-    """
-    mock_job_status.side_effect = Exception("Test exception")
-    caplog.set_level(logging.DEBUG)
+        classpath, kwargs = check_trigger.serialize()
+        assert classpath == 
"airflow.providers.google.cloud.triggers.bigquery.BigQueryCheckTrigger"
+        assert kwargs == {
+            "conn_id": TEST_CONN_ID,
+            "job_id": TEST_JOB_ID,
+            "dataset_id": TEST_DATASET_ID,
+            "project_id": TEST_GCP_PROJECT_ID,
+            "table_id": TEST_TABLE_ID,
+            "poll_interval": POLLING_PERIOD_SECONDS,
+        }
 
-    trigger = BigQueryIntervalCheckTrigger(
-        conn_id=TEST_CONN_ID,
-        first_job_id=TEST_FIRST_JOB_ID,
-        second_job_id=TEST_SECOND_JOB_ID,
-        project_id=TEST_GCP_PROJECT_ID,
-        table=TEST_TABLE_ID,
-        metrics_thresholds=TEST_METRIC_THRESHOLDS,
-        date_filter_column=TEST_DATE_FILTER_COLUMN,
-        days_back=TEST_DAYS_BACK,
-        ratio_formula=TEST_RATIO_FORMULA,
-        ignore_zero=TEST_IGNORE_ZERO,
-        dataset_id=TEST_DATASET_ID,
-        table_id=TEST_TABLE_ID,
-        poll_interval=POLLING_PERIOD_SECONDS,
-    )
+    @pytest.mark.asyncio
+    
@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_status")
+    
@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_output")
+    async def test_check_trigger_success_with_data(self, mock_job_output, 
mock_job_status, check_trigger):
+        """
+        Test the BigQueryCheckTrigger only fires once the query execution 
reaches a successful state.
+        """
+        mock_job_status.return_value = "success"
+        mock_job_output.return_value = {
+            "kind": "bigquery#getQueryResultsResponse",
+            "etag": "test_etag",
+            "schema": {"fields": [{"name": "f0_", "type": "INTEGER", "mode": 
"NULLABLE"}]},
+            "jobReference": {
+                "projectId": "test_airflow-providers",
+                "jobId": "test_jobid",
+                "location": "US",
+            },
+            "totalRows": "1",
+            "rows": [{"f": [{"v": "22"}]}],
+            "totalBytesProcessed": "0",
+            "jobComplete": True,
+            "cacheHit": False,
+        }
 
-    # trigger event is yielded so it creates a generator object
-    # so i have used async for to get all the values and added it to task
-    task = [i async for i in trigger.run()]
-    # since we use return as soon as we yield the trigger event
-    # at any given point there should be one trigger event returned to the task
-    # so we validate for length of task to be 1
+        generator = check_trigger.run()
+        actual = await generator.asend(None)
 
-    assert len(task) == 1
-    assert TriggerEvent({"status": "error", "message": "Test exception"}) in 
task
+        assert TriggerEvent({"status": "success", "records": [22]}) == actual
 
+    @pytest.mark.asyncio
+    
@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_status")
+    
@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_output")
+    async def test_check_trigger_success_without_data(self, mock_job_output, 
mock_job_status, check_trigger):
+        """
+        Tests that BigQueryCheckTrigger sends TriggerEvent as  { "status": 
"success", "records": None}
+        when no rows are available in the query result.
+        """
+        mock_job_status.return_value = "success"
+        mock_job_output.return_value = {
+            "kind": "bigquery#getQueryResultsResponse",
+            "etag": "test_etag",
+            "schema": {
+                "fields": [
+                    {"name": "value", "type": "INTEGER", "mode": "NULLABLE"},
+                    {"name": "name", "type": "STRING", "mode": "NULLABLE"},
+                    {"name": "ds", "type": "DATE", "mode": "NULLABLE"},
+                ]
+            },
+            "jobReference": {
+                "projectId": "test_airflow-airflow-providers",
+                "jobId": "test_jobid",
+                "location": "US",
+            },
+            "totalRows": "0",
+            "totalBytesProcessed": "0",
+            "jobComplete": True,
+            "cacheHit": False,
+        }
 
-def test_bigquery_value_check_op_trigger_serialization():
-    """
-    Asserts that the BigQueryValueCheckTrigger correctly serializes its 
arguments
-    and classpath.
-    """
+        generator = check_trigger.run()
+        actual = await generator.asend(None)
+        assert TriggerEvent({"status": "success", "records": None}) == actual
 
-    trigger = BigQueryValueCheckTrigger(
-        conn_id=TEST_CONN_ID,
-        pass_value=TEST_PASS_VALUE,
-        job_id=TEST_JOB_ID,
-        dataset_id=TEST_DATASET_ID,
-        project_id=TEST_GCP_PROJECT_ID,
-        sql=TEST_SQL_QUERY,
-        table_id=TEST_TABLE_ID,
-        tolerance=TEST_TOLERANCE,
-        poll_interval=POLLING_PERIOD_SECONDS,
-    )
-    classpath, kwargs = trigger.serialize()
-
-    assert classpath == 
"airflow.providers.google.cloud.triggers.bigquery.BigQueryValueCheckTrigger"
-    assert kwargs == {
-        "conn_id": TEST_CONN_ID,
-        "pass_value": TEST_PASS_VALUE,
-        "job_id": TEST_JOB_ID,
-        "dataset_id": TEST_DATASET_ID,
-        "project_id": TEST_GCP_PROJECT_ID,
-        "sql": TEST_SQL_QUERY,
-        "table_id": TEST_TABLE_ID,
-        "tolerance": TEST_TOLERANCE,
-        "poll_interval": POLLING_PERIOD_SECONDS,
-    }
-
-
[email protected]
-@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_records")
-@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_output")
-@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_status")
-async def test_bigquery_value_check_op_trigger_success(mock_job_status, 
get_job_output, get_records):
-    """
-    Tests that the BigQueryValueCheckTrigger only fires once the query 
execution reaches a successful state.
-    """
-    mock_job_status.return_value = "success"
-    get_job_output.return_value = {}
-    get_records.return_value = [[2], [4]]
-
-    trigger = BigQueryValueCheckTrigger(
-        conn_id=TEST_CONN_ID,
-        pass_value=TEST_PASS_VALUE,
-        job_id=TEST_JOB_ID,
-        dataset_id=TEST_DATASET_ID,
-        project_id=TEST_GCP_PROJECT_ID,
-        sql=TEST_SQL_QUERY,
-        table_id=TEST_TABLE_ID,
-        tolerance=TEST_TOLERANCE,
-        poll_interval=POLLING_PERIOD_SECONDS,
-    )
 
-    asyncio.create_task(trigger.run().__anext__())
-    await asyncio.sleep(0.5)
+class TestBigQueryIntervalCheckTrigger:
+    def test_interval_check_trigger_serialization(self, 
interval_check_trigger):
+        """
+        Asserts that the BigQueryIntervalCheckTrigger correctly serializes its 
arguments and classpath.
+        """
+
+        classpath, kwargs = interval_check_trigger.serialize()
+        assert classpath == 
"airflow.providers.google.cloud.triggers.bigquery.BigQueryIntervalCheckTrigger"
+        assert kwargs == {
+            "conn_id": TEST_CONN_ID,
+            "first_job_id": TEST_FIRST_JOB_ID,
+            "second_job_id": TEST_SECOND_JOB_ID,
+            "project_id": TEST_GCP_PROJECT_ID,
+            "table": TEST_TABLE_ID,
+            "metrics_thresholds": TEST_METRIC_THRESHOLDS,
+            "date_filter_column": TEST_DATE_FILTER_COLUMN,
+            "days_back": TEST_DAYS_BACK,
+            "ratio_formula": TEST_RATIO_FORMULA,
+            "ignore_zero": TEST_IGNORE_ZERO,
+        }
 
-    generator = trigger.run()
-    actual = await generator.asend(None)
-    assert actual == TriggerEvent({"status": "success", "message": "Job 
completed", "records": [4]})
+    @pytest.mark.asyncio
+    
@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_status")
+    
@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_output")
+    async def test_interval_check_trigger_success(
+        self, mock_get_job_output, mock_job_status, interval_check_trigger
+    ):
+        """
+        Tests the BigQueryIntervalCheckTrigger only fires once the query 
execution reaches a successful state.
+        """
+        mock_job_status.return_value = "success"
+        mock_get_job_output.return_value = ["0"]
 
+        generator = interval_check_trigger.run()
+        actual = await generator.asend(None)
+        assert actual == TriggerEvent({"status": "error", "message": "The 
second SQL query returned None"})
 
[email protected]
-@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_status")
-async def test_bigquery_value_check_op_trigger_pending(mock_job_status, 
caplog):
-    """
-    Tests that the BigQueryValueCheckTrigger only fires once the query 
execution reaches a successful state.
-    """
-    mock_job_status.return_value = "pending"
-    caplog.set_level(logging.INFO)
+    @pytest.mark.asyncio
+    
@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_status")
+    async def test_interval_check_trigger_pending(self, mock_job_status, 
caplog, interval_check_trigger):
+        """
+        Tests that the BigQueryIntervalCheckTrigger do not fire while a query 
is still running.
+        """
+        mock_job_status.return_value = "pending"
+        caplog.set_level(logging.INFO)
 
-    trigger = BigQueryValueCheckTrigger(
-        TEST_CONN_ID,
-        TEST_PASS_VALUE,
-        TEST_JOB_ID,
-        TEST_DATASET_ID,
-        TEST_GCP_PROJECT_ID,
-        TEST_SQL_QUERY,
-        TEST_TABLE_ID,
-        TEST_TOLERANCE,
-        POLLING_PERIOD_SECONDS,
-    )
+        task = asyncio.create_task(interval_check_trigger.run().__anext__())
+        await asyncio.sleep(0.5)
 
-    task = asyncio.create_task(trigger.run().__anext__())
-    await asyncio.sleep(0.5)
+        # TriggerEvent was not returned
+        assert task.done() is False
 
-    # TriggerEvent was returned
-    assert task.done() is False
+        assert "Query is still running..." in caplog.text
+        assert f"Sleeping for {POLLING_PERIOD_SECONDS} seconds." in caplog.text
 
-    assert "Query is still running..." in caplog.text
+        # Prevents error when task is destroyed while in "pending" state
+        asyncio.get_event_loop().stop()
 
-    assert f"Sleeping for {POLLING_PERIOD_SECONDS} seconds." in caplog.text
+    @pytest.mark.asyncio
+    
@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_status")
+    async def test_interval_check_trigger_terminated(self, mock_job_status, 
interval_check_trigger):
+        """Tests the BigQueryIntervalCheckTrigger fires the correct event in 
case of an error."""
+        # Set the status to a value other than success or pending
+        mock_job_status.return_value = "error"
 
-    # Prevents error when task is destroyed while in "pending" state
-    asyncio.get_event_loop().stop()
+        generator = interval_check_trigger.run()
+        actual = await generator.asend(None)
 
+        assert TriggerEvent({"status": "error", "message": "error", "data": 
None}) == actual
 
[email protected]
-@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_status")
-async def test_bigquery_value_check_op_trigger_fail(mock_job_status):
-    """
-    Tests that the BigQueryValueCheckTrigger only fires once the query 
execution reaches a successful state.
-    """
-    mock_job_status.return_value = "dummy"
+    @pytest.mark.asyncio
+    
@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_status")
+    async def test_interval_check_trigger_exception(self, mock_job_status, 
caplog, interval_check_trigger):
+        """Tests that the BigQueryIntervalCheckTrigger fires the correct event 
in case of an error."""
+        mock_job_status.side_effect = Exception("Test exception")
+        caplog.set_level(logging.DEBUG)
 
-    trigger = BigQueryValueCheckTrigger(
-        TEST_CONN_ID,
-        TEST_PASS_VALUE,
-        TEST_JOB_ID,
-        TEST_DATASET_ID,
-        TEST_GCP_PROJECT_ID,
-        TEST_SQL_QUERY,
-        TEST_TABLE_ID,
-        TEST_TOLERANCE,
-        POLLING_PERIOD_SECONDS,
-    )
+        # trigger event is yielded so it creates a generator object
+        # so i have used async for to get all the values and added it to task
+        task = [i async for i in interval_check_trigger.run()]
+        # since we use return as soon as we yield the trigger event
+        # at any given point there should be one trigger event returned to the 
task
+        # so we validate for length of task to be 1
 
-    generator = trigger.run()
-    actual = await generator.asend(None)
-    assert TriggerEvent({"status": "error", "message": "dummy", "records": 
None}) == actual
+        assert len(task) == 1
+        assert TriggerEvent({"status": "error", "message": "Test exception"}) 
in task
 
 
[email protected]
-@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_status")
-async def test_bigquery_value_check_trigger_exception(mock_job_status):
-    """
-    Tests the BigQueryValueCheckTrigger does not fire if there is an exception.
-    """
-    mock_job_status.side_effect = Exception("Test exception")
+class TestBigQueryValueCheckTrigger:
+    def test_bigquery_value_check_op_trigger_serialization(self, 
value_check_trigger):
+        """Asserts that the BigQueryValueCheckTrigger correctly serializes its 
arguments and classpath."""
 
-    trigger = BigQueryValueCheckTrigger(
-        conn_id=TEST_CONN_ID,
-        sql=TEST_SQL_QUERY,
-        pass_value=TEST_PASS_VALUE,
-        tolerance=1,
-        job_id=TEST_JOB_ID,
-        project_id=TEST_GCP_PROJECT_ID,
-    )
+        classpath, kwargs = value_check_trigger.serialize()
 
-    # trigger event is yielded so it creates a generator object
-    # so i have used async for to get all the values and added it to task
-    task = [i async for i in trigger.run()]
-    # since we use return as soon as we yield the trigger event
-    # at any given point there should be one trigger event returned to the task
-    # so we validate for length of task to be 1
+        assert classpath == 
"airflow.providers.google.cloud.triggers.bigquery.BigQueryValueCheckTrigger"
+        assert kwargs == {
+            "conn_id": TEST_CONN_ID,
+            "pass_value": TEST_PASS_VALUE,
+            "job_id": TEST_JOB_ID,
+            "dataset_id": TEST_DATASET_ID,
+            "project_id": TEST_GCP_PROJECT_ID,
+            "sql": TEST_SQL_QUERY,
+            "table_id": TEST_TABLE_ID,
+            "tolerance": TEST_TOLERANCE,
+            "poll_interval": POLLING_PERIOD_SECONDS,
+        }
 
-    assert len(task) == 1
-    assert TriggerEvent({"status": "error", "message": "Test exception"}) in 
task
+    @pytest.mark.asyncio
+    
@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_records")
+    
@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_output")
+    
@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_status")
+    async def test_value_check_op_trigger_success(
+        self, mock_job_status, get_job_output, get_records, value_check_trigger
+    ):
+        """
+        Tests BigQueryValueCheckTrigger only fires once the query execution 
reaches a successful state.
+        """
+        mock_job_status.return_value = "success"
+        get_job_output.return_value = {}
+        get_records.return_value = [[2], [4]]
 
+        asyncio.create_task(value_check_trigger.run().__anext__())
+        await asyncio.sleep(0.5)
 
-def test_big_query_table_existence_trigger_serialization():
-    """
-    Asserts that the BigQueryTableExistenceTrigger correctly serializes its 
arguments
-    and classpath.
-    """
-    trigger = BigQueryTableExistenceTrigger(
-        TEST_GCP_PROJECT_ID,
-        TEST_DATASET_ID,
-        TEST_TABLE_ID,
-        TEST_GCP_CONN_ID,
-        TEST_HOOK_PARAMS,
-        POLLING_PERIOD_SECONDS,
-    )
-    classpath, kwargs = trigger.serialize()
-    assert classpath == 
"airflow.providers.google.cloud.triggers.bigquery.BigQueryTableExistenceTrigger"
-    assert kwargs == {
-        "dataset_id": TEST_DATASET_ID,
-        "project_id": TEST_GCP_PROJECT_ID,
-        "table_id": TEST_TABLE_ID,
-        "gcp_conn_id": TEST_GCP_CONN_ID,
-        "poll_interval": POLLING_PERIOD_SECONDS,
-        "hook_params": TEST_HOOK_PARAMS,
-    }
-
-
[email protected]
-@async_mock.patch(
-    
"airflow.providers.google.cloud.triggers.bigquery.BigQueryTableExistenceTrigger._table_exists"
-)
-async def test_big_query_table_existence_trigger_success(mock_table_exists):
-    """
-    Tests success case BigQueryTableExistenceTrigger
-    """
-    mock_table_exists.return_value = True
+        generator = value_check_trigger.run()
+        actual = await generator.asend(None)
+        assert actual == TriggerEvent({"status": "success", "message": "Job 
completed", "records": [4]})
 
-    trigger = BigQueryTableExistenceTrigger(
-        TEST_GCP_PROJECT_ID,
-        TEST_DATASET_ID,
-        TEST_TABLE_ID,
-        TEST_GCP_CONN_ID,
-        TEST_HOOK_PARAMS,
-        POLLING_PERIOD_SECONDS,
-    )
+    @pytest.mark.asyncio
+    
@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_status")
+    async def test_value_check_op_trigger_pending(self, mock_job_status, 
caplog, value_check_trigger):
+        """
+        Tests BigQueryValueCheckTrigger only fires once the query execution 
reaches a successful state.
+        """
+        mock_job_status.return_value = "pending"
+        caplog.set_level(logging.INFO)
 
-    generator = trigger.run()
-    actual = await generator.asend(None)
-    assert TriggerEvent({"status": "success", "message": "success"}) == actual
+        task = asyncio.create_task(value_check_trigger.run().__anext__())
+        await asyncio.sleep(0.5)
 
+        # TriggerEvent was returned
+        assert task.done() is False
 
[email protected]
-@async_mock.patch(
-    
"airflow.providers.google.cloud.triggers.bigquery.BigQueryTableExistenceTrigger._table_exists"
-)
-async def test_big_query_table_existence_trigger_pending(mock_table_exists):
-    """
-    Test that BigQueryTableExistenceTrigger is in loop till the table exist.
-    """
-    mock_table_exists.return_value = False
+        assert "Query is still running..." in caplog.text
 
-    trigger = BigQueryTableExistenceTrigger(
-        TEST_GCP_PROJECT_ID,
-        TEST_DATASET_ID,
-        TEST_TABLE_ID,
-        TEST_GCP_CONN_ID,
-        TEST_HOOK_PARAMS,
-        POLLING_PERIOD_SECONDS,
-    )
-    task = asyncio.create_task(trigger.run().__anext__())
-    await asyncio.sleep(0.5)
+        assert f"Sleeping for {POLLING_PERIOD_SECONDS} seconds." in caplog.text
 
-    # TriggerEvent was not returned
-    assert task.done() is False
-    asyncio.get_event_loop().stop()
+        # Prevents error when task is destroyed while in "pending" state
+        asyncio.get_event_loop().stop()
 
+    @pytest.mark.asyncio
+    
@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_status")
+    async def test_value_check_op_trigger_fail(self, mock_job_status, 
value_check_trigger):
+        """
+        Tests BigQueryValueCheckTrigger only fires once the query execution 
reaches a successful state.
+        """
+        mock_job_status.return_value = "dummy"
+
+        generator = value_check_trigger.run()
+        actual = await generator.asend(None)
+        assert TriggerEvent({"status": "error", "message": "dummy", "records": 
None}) == actual
+
+    @pytest.mark.asyncio
+    
@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_status")
+    async def test_value_check_trigger_exception(self, mock_job_status):
+        """Tests the BigQueryValueCheckTrigger does not fire if there is an 
exception."""
+        mock_job_status.side_effect = Exception("Test exception")
+
+        trigger = BigQueryValueCheckTrigger(
+            conn_id=TEST_CONN_ID,
+            sql=TEST_SQL_QUERY,
+            pass_value=TEST_PASS_VALUE,
+            tolerance=1,
+            job_id=TEST_JOB_ID,
+            project_id=TEST_GCP_PROJECT_ID,
+        )
 
[email protected]
-@async_mock.patch(
-    
"airflow.providers.google.cloud.triggers.bigquery.BigQueryTableExistenceTrigger._table_exists"
-)
-async def test_big_query_table_existence_trigger_exception(mock_table_exists):
-    """
-    Test BigQueryTableExistenceTrigger throws exception if any error.
-    """
-    mock_table_exists.side_effect = AsyncMock(side_effect=Exception("Test 
exception"))
+        # trigger event is yielded so it creates a generator object
+        # so i have used async for to get all the values and added it to task
+        task = [i async for i in trigger.run()]
+        # since we use return as soon as we yield the trigger event
+        # at any given point there should be one trigger event returned to the 
task
+        # so we validate for length of task to be 1
 
-    trigger = BigQueryTableExistenceTrigger(
-        TEST_GCP_PROJECT_ID,
-        TEST_DATASET_ID,
-        TEST_TABLE_ID,
-        TEST_GCP_CONN_ID,
-        TEST_HOOK_PARAMS,
-        POLLING_PERIOD_SECONDS,
-    )
-    task = [i async for i in trigger.run()]
-    assert len(task) == 1
-    assert TriggerEvent({"status": "error", "message": "Test exception"}) in 
task
-
-
[email protected]
-@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryTableAsyncHook.get_table_client")
-async def test_table_exists(mock_get_table_client):
-    """Test BigQueryTableExistenceTrigger._table_exists async function with 
mocked value
-    and mocked return value"""
-    hook = BigQueryTableAsyncHook()
-    mock_get_table_client.return_value = AsyncMock(Table)
-    trigger = BigQueryTableExistenceTrigger(
-        TEST_GCP_PROJECT_ID,
-        TEST_DATASET_ID,
-        TEST_TABLE_ID,
-        TEST_GCP_CONN_ID,
-        TEST_HOOK_PARAMS,
-        POLLING_PERIOD_SECONDS,
-    )
-    res = await trigger._table_exists(hook, TEST_DATASET_ID, TEST_TABLE_ID, 
TEST_GCP_PROJECT_ID)
-    assert res is True
-
-
[email protected]
-@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryTableAsyncHook.get_table_client")
-async def test_table_exists_exception(mock_get_table_client):
-    """Test BigQueryTableExistenceTrigger._table_exists async function with 
exception and return False"""
-    hook = BigQueryTableAsyncHook()
-    mock_get_table_client.side_effect = ClientResponseError(
-        history=(),
-        request_info=RequestInfo(
-            headers=CIMultiDict(),
-            real_url=URL("https://example.com";),
-            method="GET",
-            url=URL("https://example.com";),
-        ),
-        status=404,
-        message="Not Found",
-    )
-    trigger = BigQueryTableExistenceTrigger(
-        TEST_GCP_PROJECT_ID,
-        TEST_DATASET_ID,
-        TEST_TABLE_ID,
-        TEST_GCP_CONN_ID,
-        TEST_HOOK_PARAMS,
-        POLLING_PERIOD_SECONDS,
-    )
-    res = await trigger._table_exists(hook, TEST_DATASET_ID, TEST_TABLE_ID, 
TEST_GCP_PROJECT_ID)
-    expected_response = False
-    assert res == expected_response
-
-
[email protected]
-@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryTableAsyncHook.get_table_client")
-async def test_table_exists_raise_exception(mock_get_table_client):
-    """Test BigQueryTableExistenceTrigger._table_exists async function with 
raise exception"""
-    hook = BigQueryTableAsyncHook()
-    mock_get_table_client.side_effect = ClientResponseError(
-        history=(),
-        request_info=RequestInfo(
-            headers=CIMultiDict(),
-            real_url=URL("https://example.com";),
-            method="GET",
-            url=URL("https://example.com";),
-        ),
-        status=400,
-        message="Not Found",
-    )
-    trigger = BigQueryTableExistenceTrigger(
-        TEST_GCP_PROJECT_ID,
-        TEST_DATASET_ID,
-        TEST_TABLE_ID,
-        TEST_GCP_CONN_ID,
-        TEST_HOOK_PARAMS,
-        POLLING_PERIOD_SECONDS,
-    )
-    with pytest.raises(ClientResponseError):
-        await trigger._table_exists(hook, TEST_DATASET_ID, TEST_TABLE_ID, 
TEST_GCP_PROJECT_ID)
+        assert len(task) == 1
+        assert TriggerEvent({"status": "error", "message": "Test exception"}) 
in task
+
+
+class TestBigQueryTableExistenceTrigger:
+    def test_table_existence_trigger_serialization(self, 
table_existence_trigger):
+        """
+        Asserts that the BigQueryTableExistenceTrigger correctly serializes 
its arguments and classpath.
+        """
+
+        classpath, kwargs = table_existence_trigger.serialize()
+        assert classpath == 
"airflow.providers.google.cloud.triggers.bigquery.BigQueryTableExistenceTrigger"
+        assert kwargs == {
+            "dataset_id": TEST_DATASET_ID,
+            "project_id": TEST_GCP_PROJECT_ID,
+            "table_id": TEST_TABLE_ID,
+            "gcp_conn_id": TEST_GCP_CONN_ID,
+            "poll_interval": POLLING_PERIOD_SECONDS,
+            "hook_params": TEST_HOOK_PARAMS,
+        }
+
+    @pytest.mark.asyncio
+    @async_mock.patch(
+        
"airflow.providers.google.cloud.triggers.bigquery.BigQueryTableExistenceTrigger._table_exists"
+    )
+    async def test_big_query_table_existence_trigger_success(
+        self, mock_table_exists, table_existence_trigger
+    ):
+        """Tests success case BigQueryTableExistenceTrigger"""
+        mock_table_exists.return_value = True
+
+        generator = table_existence_trigger.run()
+        actual = await generator.asend(None)
+        assert TriggerEvent({"status": "success", "message": "success"}) == 
actual
+
+    @pytest.mark.asyncio
+    @async_mock.patch(
+        
"airflow.providers.google.cloud.triggers.bigquery.BigQueryTableExistenceTrigger._table_exists"
+    )
+    async def test_table_existence_trigger_pending(self, mock_table_exists, 
table_existence_trigger):
+        """Test that BigQueryTableExistenceTrigger is in loop till the table 
exist."""
+        mock_table_exists.return_value = False
+
+        task = asyncio.create_task(table_existence_trigger.run().__anext__())
+        await asyncio.sleep(0.5)
+
+        # TriggerEvent was not returned
+        assert task.done() is False
+        asyncio.get_event_loop().stop()
+
+    @pytest.mark.asyncio
+    @async_mock.patch(
+        
"airflow.providers.google.cloud.triggers.bigquery.BigQueryTableExistenceTrigger._table_exists"
+    )
+    async def test_table_existence_trigger_exception(self, mock_table_exists, 
table_existence_trigger):
+        """Test BigQueryTableExistenceTrigger throws exception if any error."""
+        mock_table_exists.side_effect = AsyncMock(side_effect=Exception("Test 
exception"))
+
+        task = [i async for i in table_existence_trigger.run()]
+        assert len(task) == 1
+        assert TriggerEvent({"status": "error", "message": "Test exception"}) 
in task
+
+    @pytest.mark.asyncio
+    
@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryTableAsyncHook.get_table_client")
+    async def test_table_exists(self, mock_get_table_client, 
table_existence_trigger):
+        """Test BigQueryTableExistenceTrigger._table_exists async function 
with mocked value
+        and mocked return value"""
+        hook = BigQueryTableAsyncHook()
+        mock_get_table_client.return_value = AsyncMock(Table)
+
+        res = await table_existence_trigger._table_exists(
+            hook, TEST_DATASET_ID, TEST_TABLE_ID, TEST_GCP_PROJECT_ID
+        )
+        assert res is True
+
+    @pytest.mark.asyncio
+    
@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryTableAsyncHook.get_table_client")
+    async def test_table_exists_exception(self, mock_get_table_client, 
table_existence_trigger):
+        """Test BigQueryTableExistenceTrigger._table_exists async function 
with exception and return False"""
+        hook = BigQueryTableAsyncHook()
+        mock_get_table_client.side_effect = ClientResponseError(
+            history=(),
+            request_info=RequestInfo(
+                headers=CIMultiDict(),
+                real_url=URL("https://example.com";),
+                method="GET",
+                url=URL("https://example.com";),
+            ),
+            status=404,
+            message="Not Found",
+        )
+
+        res = await table_existence_trigger._table_exists(
+            hook, TEST_DATASET_ID, TEST_TABLE_ID, TEST_GCP_PROJECT_ID
+        )
+        expected_response = False
+        assert res == expected_response
+
+    @pytest.mark.asyncio
+    
@async_mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryTableAsyncHook.get_table_client")
+    async def test_table_exists_raise_exception(self, mock_get_table_client, 
table_existence_trigger):
+        """Test BigQueryTableExistenceTrigger._table_exists async function 
with raise exception"""
+        hook = BigQueryTableAsyncHook()
+        mock_get_table_client.side_effect = ClientResponseError(
+            history=(),
+            request_info=RequestInfo(
+                headers=CIMultiDict(),
+                real_url=URL("https://example.com";),
+                method="GET",
+                url=URL("https://example.com";),
+            ),
+            status=400,
+            message="Not Found",
+        )
+
+        with pytest.raises(ClientResponseError):
+            await table_existence_trigger._table_exists(
+                hook, TEST_DATASET_ID, TEST_TABLE_ID, TEST_GCP_PROJECT_ID
+            )
 
 
 class TestBigQueryTablePartitionExistenceTrigger:
-    def 
test_big_query_table_existence_partition_trigger_serialization_should_execute_successfully(self):
+    def test_serialization_successfully(self):
         """
         Asserts that the BigQueryTablePartitionExistenceTrigger correctly 
serializes its arguments
         and classpath.
diff --git a/tests/providers/google/cloud/triggers/test_bigquery_dts.py 
b/tests/providers/google/cloud/triggers/test_bigquery_dts.py
index 46074a6a54..866e4756cf 100644
--- a/tests/providers/google/cloud/triggers/test_bigquery_dts.py
+++ b/tests/providers/google/cloud/triggers/test_bigquery_dts.py
@@ -19,18 +19,13 @@ from __future__ import annotations
 
 import asyncio
 import logging
-import sys
 
 import pytest
 from google.cloud.bigquery_datatransfer_v1 import TransferState
 
 from airflow.providers.google.cloud.triggers.bigquery_dts import 
BigQueryDataTransferRunTrigger
 from airflow.triggers.base import TriggerEvent
-
-if sys.version_info < (3, 8):
-    from asynctest import mock
-else:
-    from unittest import mock
+from tests.providers.google.cloud.utils.compat import async_mock
 
 PROJECT_ID = "test-project-id"
 CONFIG_ID = "test-config-id"
@@ -91,9 +86,9 @@ class TestBigQueryDataTransferRunTrigger:
         assert actual_value == expected_value
 
     @pytest.mark.asyncio
-    
@mock.patch(f"{TRIGGER_MODULE_PATH}.AsyncBiqQueryDataTransferServiceHook.get_transfer_run")
+    
@async_mock.patch(f"{TRIGGER_MODULE_PATH}.AsyncBiqQueryDataTransferServiceHook.get_transfer_run")
     async def test_run_returns_success_event(self, mock_hook, trigger):
-        mock_hook.return_value = mock.MagicMock(state=TransferState.SUCCEEDED)
+        mock_hook.return_value = 
async_mock.MagicMock(state=TransferState.SUCCEEDED)
         expected_event = TriggerEvent(
             {
                 "run_id": RUN_ID,
@@ -107,9 +102,9 @@ class TestBigQueryDataTransferRunTrigger:
         assert actual_event == expected_event
 
     @pytest.mark.asyncio
-    
@mock.patch(f"{TRIGGER_MODULE_PATH}.AsyncBiqQueryDataTransferServiceHook.get_transfer_run")
+    
@async_mock.patch(f"{TRIGGER_MODULE_PATH}.AsyncBiqQueryDataTransferServiceHook.get_transfer_run")
     async def test_run_returns_failed_event(self, mock_hook, trigger):
-        mock_hook.return_value = mock.MagicMock(state=TransferState.FAILED)
+        mock_hook.return_value = 
async_mock.MagicMock(state=TransferState.FAILED)
         expected_event = TriggerEvent(
             {
                 "status": "failed",
@@ -122,7 +117,7 @@ class TestBigQueryDataTransferRunTrigger:
         assert actual_event == expected_event
 
     @pytest.mark.asyncio
-    
@mock.patch(f"{TRIGGER_MODULE_PATH}.AsyncBiqQueryDataTransferServiceHook.get_transfer_run")
+    
@async_mock.patch(f"{TRIGGER_MODULE_PATH}.AsyncBiqQueryDataTransferServiceHook.get_transfer_run")
     async def test_run_returns_exception_event(self, mock_hook, trigger):
         error_msg = "test error msg"
         mock_hook.side_effect = Exception(error_msg)
@@ -137,9 +132,9 @@ class TestBigQueryDataTransferRunTrigger:
         assert actual_event == expected_event
 
     @pytest.mark.asyncio
-    
@mock.patch(f"{TRIGGER_MODULE_PATH}.AsyncBiqQueryDataTransferServiceHook.get_transfer_run")
+    
@async_mock.patch(f"{TRIGGER_MODULE_PATH}.AsyncBiqQueryDataTransferServiceHook.get_transfer_run")
     async def test_run_returns_cancelled_event(self, mock_hook, trigger):
-        mock_hook.return_value = mock.MagicMock(state=TransferState.CANCELLED)
+        mock_hook.return_value = 
async_mock.MagicMock(state=TransferState.CANCELLED)
         expected_event = TriggerEvent(
             {
                 "status": "cancelled",
@@ -152,9 +147,9 @@ class TestBigQueryDataTransferRunTrigger:
         assert actual_event == expected_event
 
     @pytest.mark.asyncio
-    
@mock.patch(f"{TRIGGER_MODULE_PATH}.AsyncBiqQueryDataTransferServiceHook.get_transfer_run")
+    
@async_mock.patch(f"{TRIGGER_MODULE_PATH}.AsyncBiqQueryDataTransferServiceHook.get_transfer_run")
     async def test_run_loop_is_still_running(self, mock_hook, trigger, caplog):
-        mock_hook.return_value = mock.MagicMock(state=TransferState.RUNNING)
+        mock_hook.return_value = 
async_mock.MagicMock(state=TransferState.RUNNING)
 
         caplog.set_level(logging.INFO)
 
diff --git a/tests/providers/google/cloud/triggers/test_datafusion.py 
b/tests/providers/google/cloud/triggers/test_datafusion.py
index 907cec84fd..716ebc39a7 100644
--- a/tests/providers/google/cloud/triggers/test_datafusion.py
+++ b/tests/providers/google/cloud/triggers/test_datafusion.py
@@ -18,17 +18,12 @@ from __future__ import annotations
 
 import asyncio
 import logging
-import sys
 
 import pytest
 
 from airflow.providers.google.cloud.triggers.datafusion import 
DataFusionStartPipelineTrigger
 from airflow.triggers.base import TriggerEvent
-
-if sys.version_info < (3, 8):
-    from asynctest import mock
-else:
-    from unittest import mock
+from tests.providers.google.cloud.utils.compat import async_mock
 
 HOOK_STATUS_STR = 
"airflow.providers.google.cloud.hooks.datafusion.DataFusionAsyncHook.get_pipeline_status"
 CLASSPATH = 
"airflow.providers.google.cloud.triggers.datafusion.DataFusionStartPipelineTrigger"
@@ -78,7 +73,7 @@ class TestDataFusionStartPipelineTrigger:
         }
 
     @pytest.mark.asyncio
-    @mock.patch(HOOK_STATUS_STR)
+    @async_mock.patch(HOOK_STATUS_STR)
     async def 
test_start_pipeline_trigger_on_success_should_execute_successfully(
         self, mock_pipeline_status, trigger
     ):
@@ -94,7 +89,7 @@ class TestDataFusionStartPipelineTrigger:
         )
 
     @pytest.mark.asyncio
-    @mock.patch(HOOK_STATUS_STR)
+    @async_mock.patch(HOOK_STATUS_STR)
     async def test_start_pipeline_trigger_running_should_execute_successfully(
         self, mock_pipeline_status, trigger, caplog
     ):
@@ -117,7 +112,7 @@ class TestDataFusionStartPipelineTrigger:
         asyncio.get_event_loop().stop()
 
     @pytest.mark.asyncio
-    @mock.patch(HOOK_STATUS_STR)
+    @async_mock.patch(HOOK_STATUS_STR)
     async def test_start_pipeline_trigger_error_should_execute_successfully(
         self, mock_pipeline_status, trigger
     ):
@@ -131,7 +126,7 @@ class TestDataFusionStartPipelineTrigger:
         assert TriggerEvent({"status": "error", "message": "error"}) == actual
 
     @pytest.mark.asyncio
-    @mock.patch(HOOK_STATUS_STR)
+    @async_mock.patch(HOOK_STATUS_STR)
     async def 
test_start_pipeline_trigger_exception_should_execute_successfully(
         self, mock_pipeline_status, trigger
     ):

Reply via email to