This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 6a89ba3960 Clean bigquery operator tests (#30550)
6a89ba3960 is described below

commit 6a89ba3960d1f57e67655884b97b859568b75455
Author: Pankaj Singh <[email protected]>
AuthorDate: Sat Apr 22 16:22:22 2023 +0530

    Clean bigquery operator tests (#30550)
---
 .../google/cloud/operators/test_bigquery.py        | 1198 ++++++++++----------
 1 file changed, 580 insertions(+), 618 deletions(-)

diff --git a/tests/providers/google/cloud/operators/test_bigquery.py 
b/tests/providers/google/cloud/operators/test_bigquery.py
index 1242859667..e8d51f0b93 100644
--- a/tests/providers/google/cloud/operators/test_bigquery.py
+++ b/tests/providers/google/cloud/operators/test_bigquery.py
@@ -18,7 +18,7 @@
 from __future__ import annotations
 
 from unittest import mock
-from unittest.mock import MagicMock
+from unittest.mock import ANY, MagicMock
 
 import pandas as pd
 import pytest
@@ -26,9 +26,6 @@ from google.cloud.bigquery import DEFAULT_RETRY
 from google.cloud.exceptions import Conflict
 
 from airflow.exceptions import AirflowException, AirflowSkipException, 
AirflowTaskTimeout, TaskDeferred
-from airflow.models import DAG
-from airflow.models.dagrun import DagRun
-from airflow.models.taskinstance import TaskInstance
 from airflow.providers.google.cloud.operators.bigquery import (
     BigQueryCheckOperator,
     BigQueryColumnCheckOperator,
@@ -61,7 +58,6 @@ from airflow.providers.google.cloud.triggers.bigquery import (
 )
 from airflow.serialization.serialized_objects import SerializedDAG
 from airflow.utils.timezone import datetime
-from airflow.utils.types import DagRunType
 from tests.test_utils.db import clear_db_dags, clear_db_runs, 
clear_db_serialized_dags, clear_db_xcom
 
 TASK_ID = "test-bq-generic-operator"
@@ -165,7 +161,6 @@ class TestBigQueryCreateEmptyTableOperator:
 
     
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
     def test_create_clustered_empty_table(self, mock_hook):
-
         schema_fields = [
             {"name": "emp_name", "type": "STRING", "mode": "REQUIRED"},
             {"name": "date_hired", "type": "DATE", "mode": "REQUIRED"},
@@ -199,40 +194,39 @@ class TestBigQueryCreateEmptyTableOperator:
             exists_ok=False,
         )
 
-
[email protected](
-    "if_exists, is_conflict, expected_error, log_msg",
-    [
-        ("ignore", False, None, None),
-        ("log", False, None, None),
-        ("log", True, None, f"Table {TEST_DATASET}.{TEST_TABLE_ID} already 
exists."),
-        ("fail", False, None, None),
-        ("fail", True, AirflowException, None),
-        ("skip", False, None, None),
-        ("skip", True, AirflowSkipException, None),
-    ],
-)
[email protected]("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
-def test_create_existing_table(mock_hook, caplog, if_exists, is_conflict, 
expected_error, log_msg):
-    operator = BigQueryCreateEmptyTableOperator(
-        task_id=TASK_ID,
-        dataset_id=TEST_DATASET,
-        project_id=TEST_GCP_PROJECT_ID,
-        table_id=TEST_TABLE_ID,
-        view=VIEW_DEFINITION,
-        if_exists=if_exists,
+    @pytest.mark.parametrize(
+        "if_exists, is_conflict, expected_error, log_msg",
+        [
+            ("ignore", False, None, None),
+            ("log", False, None, None),
+            ("log", True, None, f"Table {TEST_DATASET}.{TEST_TABLE_ID} already 
exists."),
+            ("fail", False, None, None),
+            ("fail", True, AirflowException, None),
+            ("skip", False, None, None),
+            ("skip", True, AirflowSkipException, None),
+        ],
     )
-    if is_conflict:
-        mock_hook.return_value.create_empty_table.side_effect = Conflict("any")
-    else:
-        mock_hook.return_value.create_empty_table.side_effect = None
-    if expected_error is not None:
-        with pytest.raises(expected_error):
+    
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
+    def test_create_existing_table(self, mock_hook, caplog, if_exists, 
is_conflict, expected_error, log_msg):
+        operator = BigQueryCreateEmptyTableOperator(
+            task_id=TASK_ID,
+            dataset_id=TEST_DATASET,
+            project_id=TEST_GCP_PROJECT_ID,
+            table_id=TEST_TABLE_ID,
+            view=VIEW_DEFINITION,
+            if_exists=if_exists,
+        )
+        if is_conflict:
+            mock_hook.return_value.create_empty_table.side_effect = 
Conflict("any")
+        else:
+            mock_hook.return_value.create_empty_table.side_effect = None
+        if expected_error is not None:
+            with pytest.raises(expected_error):
+                operator.execute(context=MagicMock())
+        else:
             operator.execute(context=MagicMock())
-    else:
-        operator.execute(context=MagicMock())
-    if log_msg is not None:
-        assert log_msg in caplog.text
+        if log_msg is not None:
+            assert log_msg in caplog.text
 
 
 class TestBigQueryCreateExternalTableOperator:
@@ -321,39 +315,38 @@ class TestBigQueryCreateEmptyDatasetOperator:
             exists_ok=False,
         )
 
-
[email protected](
-    "if_exists, is_conflict, expected_error, log_msg",
-    [
-        ("ignore", False, None, None),
-        ("log", False, None, None),
-        ("log", True, None, f"Dataset {TEST_DATASET} already exists."),
-        ("fail", False, None, None),
-        ("fail", True, AirflowException, None),
-        ("skip", False, None, None),
-        ("skip", True, AirflowSkipException, None),
-    ],
-)
[email protected]("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
-def test_create_empty_dataset(mock_hook, caplog, if_exists, is_conflict, 
expected_error, log_msg):
-    operator = BigQueryCreateEmptyDatasetOperator(
-        task_id=TASK_ID,
-        dataset_id=TEST_DATASET,
-        project_id=TEST_GCP_PROJECT_ID,
-        location=TEST_DATASET_LOCATION,
-        if_exists=if_exists,
+    @pytest.mark.parametrize(
+        "if_exists, is_conflict, expected_error, log_msg",
+        [
+            ("ignore", False, None, None),
+            ("log", False, None, None),
+            ("log", True, None, f"Dataset {TEST_DATASET} already exists."),
+            ("fail", False, None, None),
+            ("fail", True, AirflowException, None),
+            ("skip", False, None, None),
+            ("skip", True, AirflowSkipException, None),
+        ],
     )
-    if is_conflict:
-        mock_hook.return_value.create_empty_dataset.side_effect = 
Conflict("any")
-    else:
-        mock_hook.return_value.create_empty_dataset.side_effect = None
-    if expected_error is not None:
-        with pytest.raises(expected_error):
+    
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
+    def test_create_empty_dataset(self, mock_hook, caplog, if_exists, 
is_conflict, expected_error, log_msg):
+        operator = BigQueryCreateEmptyDatasetOperator(
+            task_id=TASK_ID,
+            dataset_id=TEST_DATASET,
+            project_id=TEST_GCP_PROJECT_ID,
+            location=TEST_DATASET_LOCATION,
+            if_exists=if_exists,
+        )
+        if is_conflict:
+            mock_hook.return_value.create_empty_dataset.side_effect = 
Conflict("any")
+        else:
+            mock_hook.return_value.create_empty_dataset.side_effect = None
+        if expected_error is not None:
+            with pytest.raises(expected_error):
+                operator.execute(context=MagicMock())
+        else:
             operator.execute(context=MagicMock())
-    else:
-        operator.execute(context=MagicMock())
-    if log_msg is not None:
-        assert log_msg in caplog.text
+        if log_msg is not None:
+            assert log_msg in caplog.text
 
 
 class TestBigQueryGetDatasetOperator:
@@ -394,7 +387,6 @@ class TestBigQueryUpdateTableOperator:
 class TestBigQueryUpdateTableSchemaOperator:
     
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
     def test_execute(self, mock_hook):
-
         schema_field_updates = [
             {
                 "name": "emp_name",
@@ -794,7 +786,6 @@ class TestBigQueryOperator:
 class TestBigQueryGetDataOperator:
     
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
     def test_execute(self, mock_hook):
-
         max_results = 100
         selected_fields = "DATE"
         operator = BigQueryGetDataOperator(
@@ -816,6 +807,100 @@ class TestBigQueryGetDataOperator:
             location=TEST_DATASET_LOCATION,
         )
 
+    
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
+    def test_bigquery_get_data_operator_async_with_selected_fields(
+        self, mock_hook, create_task_instance_of_operator
+    ):
+        """
+        Asserts that a task is deferred and a BigQuerygetDataTrigger will be 
fired
+        when the BigQueryGetDataOperator is executed with deferrable=True.
+        """
+        job_id = "123456"
+        hash_ = "hash"
+        real_job_id = f"{job_id}_{hash_}"
+
+        mock_hook.return_value.insert_job.return_value = 
MagicMock(job_id=real_job_id, error_result=False)
+
+        ti = create_task_instance_of_operator(
+            BigQueryGetDataOperator,
+            dag_id="dag_id",
+            task_id="get_data_from_bq",
+            dataset_id=TEST_DATASET,
+            table_id=TEST_TABLE_ID,
+            max_results=100,
+            selected_fields="value,name",
+            deferrable=True,
+        )
+
+        with pytest.raises(TaskDeferred) as exc:
+            ti.task.execute(MagicMock())
+
+        assert isinstance(
+            exc.value.trigger, BigQueryGetDataTrigger
+        ), "Trigger is not a BigQueryGetDataTrigger"
+
+    
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
+    def test_bigquery_get_data_operator_async_without_selected_fields(
+        self, mock_hook, create_task_instance_of_operator
+    ):
+        """
+        Asserts that a task is deferred and a BigQueryGetDataTrigger will be 
fired
+        when the BigQueryGetDataOperator is executed with deferrable=True.
+        """
+        job_id = "123456"
+        hash_ = "hash"
+        real_job_id = f"{job_id}_{hash_}"
+
+        mock_hook.return_value.insert_job.return_value = 
MagicMock(job_id=real_job_id, error_result=False)
+
+        ti = create_task_instance_of_operator(
+            BigQueryGetDataOperator,
+            dag_id="dag_id",
+            task_id="get_data_from_bq",
+            dataset_id=TEST_DATASET,
+            table_id=TEST_TABLE_ID,
+            max_results=100,
+            deferrable=True,
+        )
+
+        with pytest.raises(TaskDeferred) as exc:
+            ti.task.execute(MagicMock())
+
+        assert isinstance(
+            exc.value.trigger, BigQueryGetDataTrigger
+        ), "Trigger is not a BigQueryGetDataTrigger"
+
+    def test_bigquery_get_data_operator_execute_failure(self):
+        """Tests that an AirflowException is raised in case of error event"""
+
+        operator = BigQueryGetDataOperator(
+            task_id="get_data_from_bq",
+            dataset_id=TEST_DATASET,
+            table_id="any",
+            max_results=100,
+            deferrable=True,
+        )
+
+        with pytest.raises(AirflowException):
+            operator.execute_complete(
+                context=None, event={"status": "error", "message": "test 
failure message"}
+            )
+
+    def test_bigquery_get_data_op_execute_complete_with_records(self):
+        """Asserts that exception is raised with correct expected exception 
message"""
+
+        operator = BigQueryGetDataOperator(
+            task_id="get_data_from_bq",
+            dataset_id=TEST_DATASET,
+            table_id="any",
+            max_results=100,
+            deferrable=True,
+        )
+
+        with mock.patch.object(operator.log, "info") as mock_log_info:
+            operator.execute_complete(context=None, event={"status": 
"success", "records": [20]})
+        mock_log_info.assert_called_with("Total extracted rows: %s", 1)
+
 
 class TestBigQueryTableDeleteOperator:
     
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
@@ -1188,622 +1273,499 @@ class TestBigQueryInsertJobOperator:
         with pytest.raises(AirflowException):
             op.execute(context=MagicMock())
 
+    
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
+    def test_bigquery_insert_job_operator_async(self, mock_hook, 
create_task_instance_of_operator):
+        """
+        Asserts that a task is deferred and a BigQueryInsertJobTrigger will be 
fired
+        when the BigQueryInsertJobOperator is executed with deferrable=True.
+        """
+        job_id = "123456"
+        hash_ = "hash"
+        real_job_id = f"{job_id}_{hash_}"
 
[email protected]("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
-def test_bigquery_insert_job_operator_async(mock_hook):
-    """
-    Asserts that a task is deferred and a BigQueryInsertJobTrigger will be 
fired
-    when the BigQueryInsertJobOperator is executed with deferrable=True.
-    """
-    job_id = "123456"
-    hash_ = "hash"
-    real_job_id = f"{job_id}_{hash_}"
-
-    configuration = {
-        "query": {
-            "query": "SELECT * FROM any",
-            "useLegacySql": False,
+        configuration = {
+            "query": {
+                "query": "SELECT * FROM any",
+                "useLegacySql": False,
+            }
         }
-    }
-    mock_hook.return_value.insert_job.return_value = 
MagicMock(job_id=real_job_id, error_result=False)
-
-    op = BigQueryInsertJobOperator(
-        task_id="insert_query_job",
-        configuration=configuration,
-        location=TEST_DATASET_LOCATION,
-        job_id=job_id,
-        project_id=TEST_GCP_PROJECT_ID,
-        deferrable=True,
-    )
+        mock_hook.return_value.insert_job.return_value = 
MagicMock(job_id=real_job_id, error_result=False)
 
-    with pytest.raises(TaskDeferred) as exc:
-        op.execute(create_context(op))
+        ti = create_task_instance_of_operator(
+            BigQueryInsertJobOperator,
+            dag_id="dag_id",
+            task_id="insert_query_job",
+            configuration=configuration,
+            location=TEST_DATASET_LOCATION,
+            job_id=job_id,
+            project_id=TEST_GCP_PROJECT_ID,
+            deferrable=True,
+        )
 
-    assert isinstance(
-        exc.value.trigger, BigQueryInsertJobTrigger
-    ), "Trigger is not a BigQueryInsertJobTrigger"
+        with pytest.raises(TaskDeferred) as exc:
+            ti.task.execute(MagicMock())
 
+        assert isinstance(
+            exc.value.trigger, BigQueryInsertJobTrigger
+        ), "Trigger is not a BigQueryInsertJobTrigger"
 
-def test_bigquery_insert_job_operator_execute_failure():
-    """Tests that an AirflowException is raised in case of error event"""
-    configuration = {
-        "query": {
-            "query": "SELECT * FROM any",
-            "useLegacySql": False,
+    def test_bigquery_insert_job_operator_execute_failure(self):
+        """Tests that an AirflowException is raised in case of error event"""
+        configuration = {
+            "query": {
+                "query": "SELECT * FROM any",
+                "useLegacySql": False,
+            }
         }
-    }
-    job_id = "123456"
-
-    operator = BigQueryInsertJobOperator(
-        task_id="insert_query_job",
-        configuration=configuration,
-        location=TEST_DATASET_LOCATION,
-        job_id=job_id,
-        project_id=TEST_GCP_PROJECT_ID,
-        deferrable=True,
-    )
-
-    with pytest.raises(AirflowException):
-        operator.execute_complete(context=None, event={"status": "error", 
"message": "test failure message"})
-
+        job_id = "123456"
 
-def create_context(task):
-    dag = DAG(dag_id="dag")
-    logical_date = datetime(2022, 1, 1, 0, 0, 0)
-    dag_run = DagRun(
-        dag_id=dag.dag_id,
-        execution_date=logical_date,
-        run_id=DagRun.generate_run_id(DagRunType.MANUAL, logical_date),
-    )
-    task_instance = TaskInstance(task=task)
-    task_instance.dag_run = dag_run
-    task_instance.dag_id = dag.dag_id
-    task_instance.xcom_push = mock.Mock()
-    return {
-        "dag": dag,
-        "run_id": dag_run.run_id,
-        "task": task,
-        "ti": task_instance,
-        "task_instance": task_instance,
-        "logical_date": logical_date,
-    }
-
-
-def test_bigquery_insert_job_operator_execute_complete():
-    """Asserts that logging occurs as expected"""
-    configuration = {
-        "query": {
-            "query": "SELECT * FROM any",
-            "useLegacySql": False,
-        }
-    }
-    job_id = "123456"
-
-    operator = BigQueryInsertJobOperator(
-        task_id="insert_query_job",
-        configuration=configuration,
-        location=TEST_DATASET_LOCATION,
-        job_id=job_id,
-        project_id=TEST_GCP_PROJECT_ID,
-        deferrable=True,
-    )
-    with mock.patch.object(operator.log, "info") as mock_log_info:
-        operator.execute_complete(
-            context=create_context(operator),
-            event={"status": "success", "message": "Job completed", "job_id": 
job_id},
+        operator = BigQueryInsertJobOperator(
+            task_id="insert_query_job",
+            configuration=configuration,
+            location=TEST_DATASET_LOCATION,
+            job_id=job_id,
+            project_id=TEST_GCP_PROJECT_ID,
+            deferrable=True,
         )
-    mock_log_info.assert_called_with("%s completed with response %s ", 
"insert_query_job", "Job completed")
 
+        with pytest.raises(AirflowException):
+            operator.execute_complete(
+                context=None, event={"status": "error", "message": "test 
failure message"}
+            )
 
[email protected]("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
-def test_bigquery_insert_job_operator_with_job_id_generate(mock_hook):
-    job_id = "123456"
-    hash_ = "hash"
-    real_job_id = f"{job_id}_{hash_}"
-
-    configuration = {
-        "query": {
-            "query": "SELECT * FROM any",
-            "useLegacySql": False,
+    def test_bigquery_insert_job_operator_execute_complete(self, 
create_task_instance_of_operator):
+        """Asserts that logging occurs as expected"""
+        configuration = {
+            "query": {
+                "query": "SELECT * FROM any",
+                "useLegacySql": False,
+            }
         }
-    }
-
-    mock_hook.return_value.insert_job.side_effect = Conflict("any")
-    job = MagicMock(
-        job_id=real_job_id,
-        error_result=False,
-        state="PENDING",
-        done=lambda: False,
-    )
-    mock_hook.return_value.get_job.return_value = job
-
-    op = BigQueryInsertJobOperator(
-        task_id="insert_query_job",
-        configuration=configuration,
-        location=TEST_DATASET_LOCATION,
-        job_id=job_id,
-        project_id=TEST_GCP_PROJECT_ID,
-        reattach_states={"PENDING"},
-        deferrable=True,
-    )
-
-    with pytest.raises(TaskDeferred):
-        op.execute(create_context(op))
-
-    mock_hook.return_value.generate_job_id.assert_called_once_with(
-        job_id=job_id,
-        dag_id="adhoc_airflow",
-        task_id="insert_query_job",
-        logical_date=datetime(2022, 1, 1, 0, 0),
-        configuration=configuration,
-        force_rerun=True,
-    )
+        job_id = "123456"
 
+        ti = create_task_instance_of_operator(
+            BigQueryInsertJobOperator,
+            dag_id="dag_id",
+            task_id="insert_query_job",
+            configuration=configuration,
+            location=TEST_DATASET_LOCATION,
+            job_id=job_id,
+            project_id=TEST_GCP_PROJECT_ID,
+            deferrable=True,
+        )
+        operator = ti.task
+        with mock.patch.object(operator.log, "info") as mock_log_info:
+            operator.execute_complete(
+                context=MagicMock(),
+                event={"status": "success", "message": "Job completed", 
"job_id": job_id},
+            )
+        mock_log_info.assert_called_with(
+            "%s completed with response %s ", "insert_query_job", "Job 
completed"
+        )
 
[email protected]("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
-def test_execute_reattach(mock_hook):
-    job_id = "123456"
-    hash_ = "hash"
-    real_job_id = f"{job_id}_{hash_}"
-    mock_hook.return_value.generate_job_id.return_value = f"{job_id}_{hash_}"
+    
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
+    def test_bigquery_insert_job_operator_with_job_id_generate(
+        self, mock_hook, create_task_instance_of_operator
+    ):
+        job_id = "123456"
+        hash_ = "hash"
+        real_job_id = f"{job_id}_{hash_}"
 
-    configuration = {
-        "query": {
-            "query": "SELECT * FROM any",
-            "useLegacySql": False,
+        configuration = {
+            "query": {
+                "query": "SELECT * FROM any",
+                "useLegacySql": False,
+            }
         }
-    }
-
-    mock_hook.return_value.insert_job.side_effect = Conflict("any")
-    job = MagicMock(
-        job_id=real_job_id,
-        error_result=False,
-        state="PENDING",
-        done=lambda: False,
-    )
-    mock_hook.return_value.get_job.return_value = job
-
-    op = BigQueryInsertJobOperator(
-        task_id="insert_query_job",
-        configuration=configuration,
-        location=TEST_DATASET_LOCATION,
-        job_id=job_id,
-        project_id=TEST_GCP_PROJECT_ID,
-        reattach_states={"PENDING"},
-        deferrable=True,
-    )
 
-    with pytest.raises(TaskDeferred):
-        op.execute(create_context(op))
+        mock_hook.return_value.insert_job.side_effect = Conflict("any")
+        job = MagicMock(
+            job_id=real_job_id,
+            error_result=False,
+            state="PENDING",
+            done=lambda: False,
+        )
+        mock_hook.return_value.get_job.return_value = job
 
-    mock_hook.return_value.get_job.assert_called_once_with(
-        location=TEST_DATASET_LOCATION,
-        job_id=real_job_id,
-        project_id=TEST_GCP_PROJECT_ID,
-    )
+        ti = create_task_instance_of_operator(
+            BigQueryInsertJobOperator,
+            dag_id="adhoc_airflow",
+            task_id="insert_query_job",
+            configuration=configuration,
+            location=TEST_DATASET_LOCATION,
+            job_id=job_id,
+            project_id=TEST_GCP_PROJECT_ID,
+            reattach_states={"PENDING"},
+            deferrable=True,
+        )
 
-    job._begin.assert_called_once_with()
+        with pytest.raises(TaskDeferred):
+            ti.task.execute(MagicMock())
 
+        mock_hook.return_value.generate_job_id.assert_called_once_with(
+            job_id=job_id,
+            dag_id="adhoc_airflow",
+            task_id="insert_query_job",
+            logical_date=ANY,
+            configuration=configuration,
+            force_rerun=True,
+        )
 
[email protected]("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
-def test_execute_force_rerun_async(mock_hook):
-    job_id = "123456"
-    hash_ = "hash"
-    real_job_id = f"{job_id}_{hash_}"
-    mock_hook.return_value.generate_job_id.return_value = f"{job_id}_{hash_}"
+    
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
+    def test_execute_force_rerun_async(self, mock_hook, 
create_task_instance_of_operator):
+        job_id = "123456"
+        hash_ = "hash"
+        real_job_id = f"{job_id}_{hash_}"
+        mock_hook.return_value.generate_job_id.return_value = 
f"{job_id}_{hash_}"
 
-    configuration = {
-        "query": {
-            "query": "SELECT * FROM any",
-            "useLegacySql": False,
+        configuration = {
+            "query": {
+                "query": "SELECT * FROM any",
+                "useLegacySql": False,
+            }
         }
-    }
-
-    mock_hook.return_value.insert_job.side_effect = Conflict("any")
-    job = MagicMock(
-        job_id=real_job_id,
-        error_result=False,
-        state="DONE",
-        done=lambda: False,
-    )
-    mock_hook.return_value.get_job.return_value = job
-
-    op = BigQueryInsertJobOperator(
-        task_id="insert_query_job",
-        configuration=configuration,
-        location=TEST_DATASET_LOCATION,
-        job_id=job_id,
-        project_id=TEST_GCP_PROJECT_ID,
-        reattach_states={"PENDING"},
-        deferrable=True,
-    )
-
-    with pytest.raises(AirflowException) as exc:
-        op.execute(create_context(op))
-
-    expected_exception_msg = (
-        f"Job with id: {real_job_id} already exists and is in {job.state} 
state. "
-        f"If you want to force rerun it consider setting `force_rerun=True`."
-        f"Or, if you want to reattach in this scenario add {job.state} to 
`reattach_states`"
-    )
-
-    assert str(exc.value) == expected_exception_msg
-
-    mock_hook.return_value.get_job.assert_called_once_with(
-        location=TEST_DATASET_LOCATION,
-        job_id=real_job_id,
-        project_id=TEST_GCP_PROJECT_ID,
-    )
-
-
[email protected]("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
-def test_bigquery_check_operator_async(mock_hook):
-    """
-    Asserts that a task is deferred and a BigQueryCheckTrigger will be fired
-    when the BigQueryCheckOperator is executed with deferrable=True.
-    """
-    job_id = "123456"
-    hash_ = "hash"
-    real_job_id = f"{job_id}_{hash_}"
-
-    mock_hook.return_value.insert_job.return_value = 
MagicMock(job_id=real_job_id, error_result=False)
-
-    op = BigQueryCheckOperator(
-        task_id="bq_check_operator_job",
-        sql="SELECT * FROM any",
-        location=TEST_DATASET_LOCATION,
-        deferrable=True,
-    )
-
-    with pytest.raises(TaskDeferred) as exc:
-        op.execute(create_context(op))
-
-    assert isinstance(exc.value.trigger, BigQueryCheckTrigger), "Trigger is 
not a BigQueryCheckTrigger"
-
-
-def test_bigquery_check_operator_execute_failure():
-    """Tests that an AirflowException is raised in case of error event"""
-
-    operator = BigQueryCheckOperator(
-        task_id="bq_check_operator_execute_failure",
-        sql="SELECT * FROM any",
-        location=TEST_DATASET_LOCATION,
-        deferrable=True,
-    )
-
-    with pytest.raises(AirflowException):
-        operator.execute_complete(context=None, event={"status": "error", 
"message": "test failure message"})
-
 
-def test_bigquery_check_op_execute_complete_with_no_records():
-    """Asserts that exception is raised with correct expected exception 
message"""
-
-    operator = BigQueryCheckOperator(
-        task_id="bq_check_operator_execute_complete",
-        sql="SELECT * FROM any",
-        location=TEST_DATASET_LOCATION,
-        deferrable=True,
-    )
-
-    with pytest.raises(AirflowException) as exc:
-        operator.execute_complete(context=None, event={"status": "success", 
"records": None})
-
-    expected_exception_msg = "The query returned empty results"
-
-    assert str(exc.value) == expected_exception_msg
-
-
-def test_bigquery_check_op_execute_complete_with_non_boolean_records():
-    """Executing a sql which returns a non-boolean value should raise 
exception"""
-
-    test_sql = "SELECT * FROM any"
+        mock_hook.return_value.insert_job.side_effect = Conflict("any")
+        job = MagicMock(
+            job_id=real_job_id,
+            error_result=False,
+            state="DONE",
+            done=lambda: False,
+        )
+        mock_hook.return_value.get_job.return_value = job
 
-    operator = BigQueryCheckOperator(
-        task_id="bq_check_operator_execute_complete",
-        sql=test_sql,
-        location=TEST_DATASET_LOCATION,
-        deferrable=True,
-    )
+        ti = create_task_instance_of_operator(
+            BigQueryInsertJobOperator,
+            dag_id="dag_id",
+            task_id="insert_query_job",
+            configuration=configuration,
+            location=TEST_DATASET_LOCATION,
+            job_id=job_id,
+            project_id=TEST_GCP_PROJECT_ID,
+            reattach_states={"PENDING"},
+            deferrable=True,
+        )
 
-    expected_exception_msg = f"Test 
failed.\nQuery:\n{test_sql}\nResults:\n{[20, False]!s}"
+        with pytest.raises(AirflowException) as exc:
+            ti.task.execute(MagicMock())
 
-    with pytest.raises(AirflowException) as exc:
-        operator.execute_complete(context=None, event={"status": "success", 
"records": [20, False]})
+        expected_exception_msg = (
+            f"Job with id: {real_job_id} already exists and is in {job.state} 
state. "
+            f"If you want to force rerun it consider setting 
`force_rerun=True`."
+            f"Or, if you want to reattach in this scenario add {job.state} to 
`reattach_states`"
+        )
 
-    assert str(exc.value) == expected_exception_msg
+        assert str(exc.value) == expected_exception_msg
 
+        mock_hook.return_value.get_job.assert_called_once_with(
+            location=TEST_DATASET_LOCATION,
+            job_id=real_job_id,
+            project_id=TEST_GCP_PROJECT_ID,
+        )
 
-def test_bigquery_check_operator_execute_complete():
-    """Asserts that logging occurs as expected"""
 
-    operator = BigQueryCheckOperator(
-        task_id="bq_check_operator_execute_complete",
-        sql="SELECT * FROM any",
-        location=TEST_DATASET_LOCATION,
-        deferrable=True,
-    )
+class TestBigQueryIntervalCheckOperator:
+    def test_bigquery_interval_check_operator_execute_complete(self):
+        """Asserts that logging occurs as expected"""
 
-    with mock.patch.object(operator.log, "info") as mock_log_info:
-        operator.execute_complete(context=None, event={"status": "success", 
"records": [20]})
-    mock_log_info.assert_called_with("Success.")
+        operator = BigQueryIntervalCheckOperator(
+            task_id="bq_interval_check_operator_execute_complete",
+            table="test_table",
+            metrics_thresholds={"COUNT(*)": 1.5},
+            location=TEST_DATASET_LOCATION,
+            deferrable=True,
+        )
 
+        with mock.patch.object(operator.log, "info") as mock_log_info:
+            operator.execute_complete(context=None, event={"status": 
"success", "message": "Job completed"})
+        mock_log_info.assert_called_with(
+            "%s completed with response %s ", 
"bq_interval_check_operator_execute_complete", "Job completed"
+        )
 
-def test_bigquery_interval_check_operator_execute_complete():
-    """Asserts that logging occurs as expected"""
+    def test_bigquery_interval_check_operator_execute_failure(self):
+        """Tests that an AirflowException is raised in case of error event"""
 
-    operator = BigQueryIntervalCheckOperator(
-        task_id="bq_interval_check_operator_execute_complete",
-        table="test_table",
-        metrics_thresholds={"COUNT(*)": 1.5},
-        location=TEST_DATASET_LOCATION,
-        deferrable=True,
-    )
+        operator = BigQueryIntervalCheckOperator(
+            task_id="bq_interval_check_operator_execute_complete",
+            table="test_table",
+            metrics_thresholds={"COUNT(*)": 1.5},
+            location=TEST_DATASET_LOCATION,
+            deferrable=True,
+        )
 
-    with mock.patch.object(operator.log, "info") as mock_log_info:
-        operator.execute_complete(context=None, event={"status": "success", 
"message": "Job completed"})
-    mock_log_info.assert_called_with(
-        "%s completed with response %s ", 
"bq_interval_check_operator_execute_complete", "Job completed"
-    )
+        with pytest.raises(AirflowException):
+            operator.execute_complete(
+                context=None, event={"status": "error", "message": "test 
failure message"}
+            )
 
+    
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
+    def test_bigquery_interval_check_operator_async(self, mock_hook, 
create_task_instance_of_operator):
+        """
+        Asserts that a task is deferred and a BigQueryIntervalCheckTrigger 
will be fired
+        when the BigQueryIntervalCheckOperator is executed with 
deferrable=True.
+        """
+        job_id = "123456"
+        hash_ = "hash"
+        real_job_id = f"{job_id}_{hash_}"
 
-def test_bigquery_interval_check_operator_execute_failure():
-    """Tests that an AirflowException is raised in case of error event"""
+        mock_hook.return_value.insert_job.return_value = 
MagicMock(job_id=real_job_id, error_result=False)
 
-    operator = BigQueryIntervalCheckOperator(
-        task_id="bq_interval_check_operator_execute_complete",
-        table="test_table",
-        metrics_thresholds={"COUNT(*)": 1.5},
-        location=TEST_DATASET_LOCATION,
-        deferrable=True,
-    )
+        ti = create_task_instance_of_operator(
+            BigQueryIntervalCheckOperator,
+            dag_id="dag_id",
+            task_id="bq_interval_check_operator_execute_complete",
+            table="test_table",
+            metrics_thresholds={"COUNT(*)": 1.5},
+            location=TEST_DATASET_LOCATION,
+            deferrable=True,
+        )
 
-    with pytest.raises(AirflowException):
-        operator.execute_complete(context=None, event={"status": "error", 
"message": "test failure message"})
+        with pytest.raises(TaskDeferred) as exc:
+            ti.task.execute(MagicMock())
 
+        assert isinstance(
+            exc.value.trigger, BigQueryIntervalCheckTrigger
+        ), "Trigger is not a BigQueryIntervalCheckTrigger"
 
[email protected]("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
-def test_bigquery_interval_check_operator_async(mock_hook):
-    """
-    Asserts that a task is deferred and a BigQueryIntervalCheckTrigger will be 
fired
-    when the BigQueryIntervalCheckOperator is executed with deferrable=True.
-    """
-    job_id = "123456"
-    hash_ = "hash"
-    real_job_id = f"{job_id}_{hash_}"
 
-    mock_hook.return_value.insert_job.return_value = 
MagicMock(job_id=real_job_id, error_result=False)
+class TestBigQueryCheckOperator:
+    
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
+    def test_bigquery_check_operator_async(self, mock_hook, 
create_task_instance_of_operator):
+        """
+        Asserts that a task is deferred and a BigQueryCheckTrigger will be 
fired
+        when the BigQueryCheckOperator is executed with deferrable=True.
+        """
+        job_id = "123456"
+        hash_ = "hash"
+        real_job_id = f"{job_id}_{hash_}"
 
-    op = BigQueryIntervalCheckOperator(
-        task_id="bq_interval_check_operator_execute_complete",
-        table="test_table",
-        metrics_thresholds={"COUNT(*)": 1.5},
-        location=TEST_DATASET_LOCATION,
-        deferrable=True,
-    )
+        mock_hook.return_value.insert_job.return_value = 
MagicMock(job_id=real_job_id, error_result=False)
 
-    with pytest.raises(TaskDeferred) as exc:
-        op.execute(create_context(op))
-
-    assert isinstance(
-        exc.value.trigger, BigQueryIntervalCheckTrigger
-    ), "Trigger is not a BigQueryIntervalCheckTrigger"
-
-
[email protected]("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
-def test_bigquery_get_data_operator_async_with_selected_fields(mock_hook):
-    """
-    Asserts that a task is deferred and a BigQuerygetDataTrigger will be fired
-    when the BigQueryGetDataOperator is executed with deferrable=True.
-    """
-    job_id = "123456"
-    hash_ = "hash"
-    real_job_id = f"{job_id}_{hash_}"
-
-    mock_hook.return_value.insert_job.return_value = 
MagicMock(job_id=real_job_id, error_result=False)
-
-    op = BigQueryGetDataOperator(
-        task_id="get_data_from_bq",
-        dataset_id=TEST_DATASET,
-        table_id=TEST_TABLE_ID,
-        max_results=100,
-        selected_fields="value,name",
-        deferrable=True,
-    )
+        ti = create_task_instance_of_operator(
+            BigQueryCheckOperator,
+            dag_id="dag_id",
+            task_id="bq_check_operator_job",
+            sql="SELECT * FROM any",
+            location=TEST_DATASET_LOCATION,
+            deferrable=True,
+        )
 
-    with pytest.raises(TaskDeferred) as exc:
-        op.execute(create_context(op))
+        with pytest.raises(TaskDeferred) as exc:
+            ti.task.execute(MagicMock())
 
-    assert isinstance(exc.value.trigger, BigQueryGetDataTrigger), "Trigger is 
not a BigQueryGetDataTrigger"
+        assert isinstance(exc.value.trigger, BigQueryCheckTrigger), "Trigger 
is not a BigQueryCheckTrigger"
 
+    def test_bigquery_check_operator_execute_failure(self):
+        """Tests that an AirflowException is raised in case of error event"""
 
[email protected]("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
-def test_bigquery_get_data_operator_async_without_selected_fields(mock_hook):
-    """
-    Asserts that a task is deferred and a BigQueryGetDataTrigger will be fired
-    when the BigQueryGetDataOperator is executed with deferrable=True.
-    """
-    job_id = "123456"
-    hash_ = "hash"
-    real_job_id = f"{job_id}_{hash_}"
+        operator = BigQueryCheckOperator(
+            task_id="bq_check_operator_execute_failure",
+            sql="SELECT * FROM any",
+            location=TEST_DATASET_LOCATION,
+            deferrable=True,
+        )
 
-    mock_hook.return_value.insert_job.return_value = 
MagicMock(job_id=real_job_id, error_result=False)
+        with pytest.raises(AirflowException):
+            operator.execute_complete(
+                context=None, event={"status": "error", "message": "test 
failure message"}
+            )
 
-    op = BigQueryGetDataOperator(
-        task_id="get_data_from_bq",
-        dataset_id=TEST_DATASET,
-        table_id=TEST_TABLE_ID,
-        max_results=100,
-        deferrable=True,
-    )
+    def test_bigquery_check_op_execute_complete_with_no_records(self):
+        """Asserts that exception is raised with correct expected exception 
message"""
 
-    with pytest.raises(TaskDeferred) as exc:
-        op.execute(create_context(op))
+        operator = BigQueryCheckOperator(
+            task_id="bq_check_operator_execute_complete",
+            sql="SELECT * FROM any",
+            location=TEST_DATASET_LOCATION,
+            deferrable=True,
+        )
 
-    assert isinstance(exc.value.trigger, BigQueryGetDataTrigger), "Trigger is 
not a BigQueryGetDataTrigger"
+        with pytest.raises(AirflowException) as exc:
+            operator.execute_complete(context=None, event={"status": 
"success", "records": None})
 
+        expected_exception_msg = "The query returned empty results"
 
-def test_bigquery_get_data_operator_execute_failure():
-    """Tests that an AirflowException is raised in case of error event"""
+        assert str(exc.value) == expected_exception_msg
 
-    operator = BigQueryGetDataOperator(
-        task_id="get_data_from_bq",
-        dataset_id=TEST_DATASET,
-        table_id="any",
-        max_results=100,
-        deferrable=True,
-    )
+    def test_bigquery_check_op_execute_complete_with_non_boolean_records(self):
+        """Executing a sql which returns a non-boolean value should raise 
exception"""
 
-    with pytest.raises(AirflowException):
-        operator.execute_complete(context=None, event={"status": "error", 
"message": "test failure message"})
+        test_sql = "SELECT * FROM any"
 
+        operator = BigQueryCheckOperator(
+            task_id="bq_check_operator_execute_complete",
+            sql=test_sql,
+            location=TEST_DATASET_LOCATION,
+            deferrable=True,
+        )
 
-def test_bigquery_get_data_op_execute_complete_with_records():
-    """Asserts that exception is raised with correct expected exception 
message"""
+        expected_exception_msg = f"Test 
failed.\nQuery:\n{test_sql}\nResults:\n{[20, False]!s}"
 
-    operator = BigQueryGetDataOperator(
-        task_id="get_data_from_bq",
-        dataset_id=TEST_DATASET,
-        table_id="any",
-        max_results=100,
-        deferrable=True,
-    )
+        with pytest.raises(AirflowException) as exc:
+            operator.execute_complete(context=None, event={"status": 
"success", "records": [20, False]})
 
-    with mock.patch.object(operator.log, "info") as mock_log_info:
-        operator.execute_complete(context=None, event={"status": "success", 
"records": [20]})
-    mock_log_info.assert_called_with("Total extracted rows: %s", 1)
+        assert str(exc.value) == expected_exception_msg
 
+    def test_bigquery_check_operator_execute_complete(self):
+        """Asserts that logging occurs as expected"""
 
-def _get_value_check_async_operator(use_legacy_sql: bool = False):
-    query = "SELECT COUNT(*) FROM Any"
-    pass_val = 2
+        operator = BigQueryCheckOperator(
+            task_id="bq_check_operator_execute_complete",
+            sql="SELECT * FROM any",
+            location=TEST_DATASET_LOCATION,
+            deferrable=True,
+        )
 
-    return BigQueryValueCheckOperator(
-        task_id="check_value",
-        sql=query,
-        pass_value=pass_val,
-        use_legacy_sql=use_legacy_sql,
-        deferrable=True,
-    )
+        with mock.patch.object(operator.log, "info") as mock_log_info:
+            operator.execute_complete(context=None, event={"status": 
"success", "records": [20]})
+        mock_log_info.assert_called_with("Success.")
 
 
[email protected]("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
-def test_bigquery_value_check_async(mock_hook):
-    """
-    Asserts that a task is deferred and a BigQueryValueCheckTrigger will be 
fired
-    when the BigQueryValueCheckOperatorAsync is executed.
-    """
-    operator = _get_value_check_async_operator(True)
-    job_id = "123456"
-    hash_ = "hash"
-    real_job_id = f"{job_id}_{hash_}"
-    mock_hook.return_value.insert_job.return_value = 
MagicMock(job_id=real_job_id, error_result=False)
-    with pytest.raises(TaskDeferred) as exc:
-        operator.execute(create_context(operator))
-
-    assert isinstance(
-        exc.value.trigger, BigQueryValueCheckTrigger
-    ), "Trigger is not a BigQueryValueCheckTrigger"
-
-
-def test_bigquery_value_check_operator_execute_complete_success():
-    """Tests response message in case of success event"""
-    operator = _get_value_check_async_operator()
-
-    assert (
-        operator.execute_complete(context=None, event={"status": "success", 
"message": "Job completed!"})
-        is None
+class TestBigQueryValueCheckOperator:
+    
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
+    def test_bigquery_value_check_async(self, mock_hook, 
create_task_instance_of_operator):
+        """
+        Asserts that a task is deferred and a BigQueryValueCheckTrigger will 
be fired
+        when the BigQueryValueCheckOperatorAsync is executed.
+        """
+        ti = create_task_instance_of_operator(
+            BigQueryValueCheckOperator,
+            dag_id="dag_id",
+            task_id="check_value",
+            sql="SELECT COUNT(*) FROM Any",
+            pass_value=2,
+            use_legacy_sql=True,
+            deferrable=True,
+        )
+        job_id = "123456"
+        hash_ = "hash"
+        real_job_id = f"{job_id}_{hash_}"
+        mock_hook.return_value.insert_job.return_value = 
MagicMock(job_id=real_job_id, error_result=False)
+        with pytest.raises(TaskDeferred) as exc:
+            ti.task.execute(MagicMock())
+
+        assert isinstance(
+            exc.value.trigger, BigQueryValueCheckTrigger
+        ), "Trigger is not a BigQueryValueCheckTrigger"
+
+    @pytest.mark.parametrize(
+        "kwargs, expected",
+        [
+            ({"sql": "SELECT COUNT(*) from Any"}, "missing keyword argument 
'pass_value'"),
+            ({"pass_value": "Any"}, "missing keyword argument 'sql'"),
+        ],
     )
+    def test_bigquery_value_check_missing_param(self, kwargs, expected):
+        """Assert the exception if require param not pass to 
BigQueryValueCheckOperatorAsync operator"""
+        with pytest.raises(AirflowException) as missing_param:
+            BigQueryValueCheckOperator(deferrable=True, **kwargs)
+        assert missing_param.value.args[0] == expected
+
+    def test_bigquery_value_check_empty(self):
+        """Assert the exception if require param not pass to 
BigQueryValueCheckOperatorAsync operator"""
+        expected, expected1 = (
+            "missing keyword arguments 'sql', 'pass_value'",
+            "missing keyword arguments 'pass_value', 'sql'",
+        )
+        with pytest.raises(AirflowException) as missing_param:
+            BigQueryValueCheckOperator(deferrable=True, kwargs={})
+        assert (missing_param.value.args[0] == expected) or 
(missing_param.value.args[0] == expected1)
+
+    def test_bigquery_value_check_operator_execute_complete_success(self):
+        """Tests response message in case of success event"""
+        operator = BigQueryValueCheckOperator(
+            task_id="check_value",
+            sql="SELECT COUNT(*) FROM Any",
+            pass_value=2,
+            use_legacy_sql=False,
+            deferrable=True,
+        )
+
+        assert (
+            operator.execute_complete(context=None, event={"status": 
"success", "message": "Job completed!"})
+            is None
+        )
+
+    def test_bigquery_value_check_operator_execute_complete_failure(self):
+        """Tests that an AirflowException is raised in case of error event"""
+        operator = BigQueryValueCheckOperator(
+            task_id="check_value",
+            sql="SELECT COUNT(*) FROM Any",
+            pass_value=2,
+            use_legacy_sql=False,
+            deferrable=True,
+        )
 
-
-def test_bigquery_value_check_operator_execute_complete_failure():
-    """Tests that an AirflowException is raised in case of error event"""
-    operator = _get_value_check_async_operator()
-
-    with pytest.raises(AirflowException):
-        operator.execute_complete(context=None, event={"status": "error", 
"message": "test failure message"})
-
-
[email protected](
-    "kwargs, expected",
-    [
-        ({"sql": "SELECT COUNT(*) from Any"}, "missing keyword argument 
'pass_value'"),
-        ({"pass_value": "Any"}, "missing keyword argument 'sql'"),
-    ],
-)
-def test_bigquery_value_check_missing_param(kwargs, expected):
-    """Assert the exception if require param not pass to 
BigQueryValueCheckOperatorAsync operator"""
-    with pytest.raises(AirflowException) as missing_param:
-        BigQueryValueCheckOperator(deferrable=True, **kwargs)
-    assert missing_param.value.args[0] == expected
-
-
-def test_bigquery_value_check_empty():
-    """Assert the exception if require param not pass to 
BigQueryValueCheckOperatorAsync operator"""
-    expected, expected1 = (
-        "missing keyword arguments 'sql', 'pass_value'",
-        "missing keyword arguments 'pass_value', 'sql'",
+        with pytest.raises(AirflowException):
+            operator.execute_complete(
+                context=None, event={"status": "error", "message": "test 
failure message"}
+            )
+
+
+class TestBigQueryColumnCheckOperator:
+    @pytest.mark.parametrize(
+        "check_type, check_value, check_result",
+        [
+            ("equal_to", 0, 0),
+            ("greater_than", 0, 1),
+            ("less_than", 0, -1),
+            ("geq_to", 0, 1),
+            ("geq_to", 0, 0),
+            ("leq_to", 0, 0),
+            ("leq_to", 0, -1),
+        ],
     )
-    with pytest.raises(AirflowException) as missing_param:
-        BigQueryValueCheckOperator(deferrable=True, kwargs={})
-    assert (missing_param.value.args[0] == expected) or 
(missing_param.value.args[0] == expected1)
-
+    
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
+    @mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryJob")
+    def test_bigquery_column_check_operator_succeeds(
+        self, mock_job, mock_hook, check_type, check_value, check_result, 
create_task_instance_of_operator
+    ):
+        mock_job.result.return_value.to_dataframe.return_value = pd.DataFrame(
+            {"col_name": ["col1"], "check_type": ["min"], "check_result": 
[check_result]}
+        )
+        mock_hook.return_value.insert_job.return_value = mock_job
 
[email protected](
-    "check_type, check_value, check_result",
-    [
-        ("equal_to", 0, 0),
-        ("greater_than", 0, 1),
-        ("less_than", 0, -1),
-        ("geq_to", 0, 1),
-        ("geq_to", 0, 0),
-        ("leq_to", 0, 0),
-        ("leq_to", 0, -1),
-    ],
-)
[email protected]("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
[email protected]("airflow.providers.google.cloud.hooks.bigquery.BigQueryJob")
-def test_bigquery_column_check_operator_succeeds(mock_job, mock_hook, 
check_type, check_value, check_result):
-    mock_job.result.return_value.to_dataframe.return_value = pd.DataFrame(
-        {"col_name": ["col1"], "check_type": ["min"], "check_result": 
[check_result]}
-    )
-    mock_hook.return_value.insert_job.return_value = mock_job
-
-    op = BigQueryColumnCheckOperator(
-        task_id="check_column_succeeds",
-        table=TEST_TABLE_ID,
-        use_legacy_sql=False,
-        column_mapping={
-            "col1": {"min": {check_type: check_value}},
-        },
+        ti = create_task_instance_of_operator(
+            BigQueryColumnCheckOperator,
+            dag_id="dag_id",
+            task_id="check_column_succeeds",
+            table=TEST_TABLE_ID,
+            use_legacy_sql=False,
+            column_mapping={
+                "col1": {"min": {check_type: check_value}},
+            },
+        )
+        ti.task.execute(MagicMock())
+
+    @pytest.mark.parametrize(
+        "check_type, check_value, check_result",
+        [
+            ("equal_to", 0, 1),
+            ("greater_than", 0, -1),
+            ("less_than", 0, 1),
+            ("geq_to", 0, -1),
+            ("leq_to", 0, 1),
+        ],
     )
-    op.execute(create_context(op))
-
+    
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
+    @mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryJob")
+    def test_bigquery_column_check_operator_fails(
+        self, mock_job, mock_hook, check_type, check_value, check_result, 
create_task_instance_of_operator
+    ):
+        mock_job.result.return_value.to_dataframe.return_value = pd.DataFrame(
+            {"col_name": ["col1"], "check_type": ["min"], "check_result": 
[check_result]}
+        )
+        mock_hook.return_value.insert_job.return_value = mock_job
 
[email protected](
-    "check_type, check_value, check_result",
-    [
-        ("equal_to", 0, 1),
-        ("greater_than", 0, -1),
-        ("less_than", 0, 1),
-        ("geq_to", 0, -1),
-        ("leq_to", 0, 1),
-    ],
-)
[email protected]("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
[email protected]("airflow.providers.google.cloud.hooks.bigquery.BigQueryJob")
-def test_bigquery_column_check_operator_fails(mock_job, mock_hook, check_type, 
check_value, check_result):
-    mock_job.result.return_value.to_dataframe.return_value = pd.DataFrame(
-        {"col_name": ["col1"], "check_type": ["min"], "check_result": 
[check_result]}
-    )
-    mock_hook.return_value.insert_job.return_value = mock_job
-
-    op = BigQueryColumnCheckOperator(
-        task_id="check_column_fails",
-        table=TEST_TABLE_ID,
-        use_legacy_sql=False,
-        column_mapping={
-            "col1": {"min": {check_type: check_value}},
-        },
-    )
-    with pytest.raises(AirflowException):
-        op.execute(create_context(op))
+        ti = create_task_instance_of_operator(
+            BigQueryColumnCheckOperator,
+            dag_id="dag_id",
+            task_id="check_column_fails",
+            table=TEST_TABLE_ID,
+            use_legacy_sql=False,
+            column_mapping={
+                "col1": {"min": {check_type: check_value}},
+            },
+        )
+        with pytest.raises(AirflowException):
+            ti.task.execute(MagicMock())

Reply via email to