This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 6a89ba3960 Clean bigquery operator tests (#30550)
6a89ba3960 is described below
commit 6a89ba3960d1f57e67655884b97b859568b75455
Author: Pankaj Singh <[email protected]>
AuthorDate: Sat Apr 22 16:22:22 2023 +0530
Clean bigquery operator tests (#30550)
---
.../google/cloud/operators/test_bigquery.py | 1198 ++++++++++----------
1 file changed, 580 insertions(+), 618 deletions(-)
diff --git a/tests/providers/google/cloud/operators/test_bigquery.py
b/tests/providers/google/cloud/operators/test_bigquery.py
index 1242859667..e8d51f0b93 100644
--- a/tests/providers/google/cloud/operators/test_bigquery.py
+++ b/tests/providers/google/cloud/operators/test_bigquery.py
@@ -18,7 +18,7 @@
from __future__ import annotations
from unittest import mock
-from unittest.mock import MagicMock
+from unittest.mock import ANY, MagicMock
import pandas as pd
import pytest
@@ -26,9 +26,6 @@ from google.cloud.bigquery import DEFAULT_RETRY
from google.cloud.exceptions import Conflict
from airflow.exceptions import AirflowException, AirflowSkipException,
AirflowTaskTimeout, TaskDeferred
-from airflow.models import DAG
-from airflow.models.dagrun import DagRun
-from airflow.models.taskinstance import TaskInstance
from airflow.providers.google.cloud.operators.bigquery import (
BigQueryCheckOperator,
BigQueryColumnCheckOperator,
@@ -61,7 +58,6 @@ from airflow.providers.google.cloud.triggers.bigquery import (
)
from airflow.serialization.serialized_objects import SerializedDAG
from airflow.utils.timezone import datetime
-from airflow.utils.types import DagRunType
from tests.test_utils.db import clear_db_dags, clear_db_runs,
clear_db_serialized_dags, clear_db_xcom
TASK_ID = "test-bq-generic-operator"
@@ -165,7 +161,6 @@ class TestBigQueryCreateEmptyTableOperator:
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
def test_create_clustered_empty_table(self, mock_hook):
-
schema_fields = [
{"name": "emp_name", "type": "STRING", "mode": "REQUIRED"},
{"name": "date_hired", "type": "DATE", "mode": "REQUIRED"},
@@ -199,40 +194,39 @@ class TestBigQueryCreateEmptyTableOperator:
exists_ok=False,
)
-
[email protected](
- "if_exists, is_conflict, expected_error, log_msg",
- [
- ("ignore", False, None, None),
- ("log", False, None, None),
- ("log", True, None, f"Table {TEST_DATASET}.{TEST_TABLE_ID} already
exists."),
- ("fail", False, None, None),
- ("fail", True, AirflowException, None),
- ("skip", False, None, None),
- ("skip", True, AirflowSkipException, None),
- ],
-)
[email protected]("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
-def test_create_existing_table(mock_hook, caplog, if_exists, is_conflict,
expected_error, log_msg):
- operator = BigQueryCreateEmptyTableOperator(
- task_id=TASK_ID,
- dataset_id=TEST_DATASET,
- project_id=TEST_GCP_PROJECT_ID,
- table_id=TEST_TABLE_ID,
- view=VIEW_DEFINITION,
- if_exists=if_exists,
+ @pytest.mark.parametrize(
+ "if_exists, is_conflict, expected_error, log_msg",
+ [
+ ("ignore", False, None, None),
+ ("log", False, None, None),
+ ("log", True, None, f"Table {TEST_DATASET}.{TEST_TABLE_ID} already
exists."),
+ ("fail", False, None, None),
+ ("fail", True, AirflowException, None),
+ ("skip", False, None, None),
+ ("skip", True, AirflowSkipException, None),
+ ],
)
- if is_conflict:
- mock_hook.return_value.create_empty_table.side_effect = Conflict("any")
- else:
- mock_hook.return_value.create_empty_table.side_effect = None
- if expected_error is not None:
- with pytest.raises(expected_error):
+
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
+ def test_create_existing_table(self, mock_hook, caplog, if_exists,
is_conflict, expected_error, log_msg):
+ operator = BigQueryCreateEmptyTableOperator(
+ task_id=TASK_ID,
+ dataset_id=TEST_DATASET,
+ project_id=TEST_GCP_PROJECT_ID,
+ table_id=TEST_TABLE_ID,
+ view=VIEW_DEFINITION,
+ if_exists=if_exists,
+ )
+ if is_conflict:
+ mock_hook.return_value.create_empty_table.side_effect =
Conflict("any")
+ else:
+ mock_hook.return_value.create_empty_table.side_effect = None
+ if expected_error is not None:
+ with pytest.raises(expected_error):
+ operator.execute(context=MagicMock())
+ else:
operator.execute(context=MagicMock())
- else:
- operator.execute(context=MagicMock())
- if log_msg is not None:
- assert log_msg in caplog.text
+ if log_msg is not None:
+ assert log_msg in caplog.text
class TestBigQueryCreateExternalTableOperator:
@@ -321,39 +315,38 @@ class TestBigQueryCreateEmptyDatasetOperator:
exists_ok=False,
)
-
[email protected](
- "if_exists, is_conflict, expected_error, log_msg",
- [
- ("ignore", False, None, None),
- ("log", False, None, None),
- ("log", True, None, f"Dataset {TEST_DATASET} already exists."),
- ("fail", False, None, None),
- ("fail", True, AirflowException, None),
- ("skip", False, None, None),
- ("skip", True, AirflowSkipException, None),
- ],
-)
[email protected]("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
-def test_create_empty_dataset(mock_hook, caplog, if_exists, is_conflict,
expected_error, log_msg):
- operator = BigQueryCreateEmptyDatasetOperator(
- task_id=TASK_ID,
- dataset_id=TEST_DATASET,
- project_id=TEST_GCP_PROJECT_ID,
- location=TEST_DATASET_LOCATION,
- if_exists=if_exists,
+ @pytest.mark.parametrize(
+ "if_exists, is_conflict, expected_error, log_msg",
+ [
+ ("ignore", False, None, None),
+ ("log", False, None, None),
+ ("log", True, None, f"Dataset {TEST_DATASET} already exists."),
+ ("fail", False, None, None),
+ ("fail", True, AirflowException, None),
+ ("skip", False, None, None),
+ ("skip", True, AirflowSkipException, None),
+ ],
)
- if is_conflict:
- mock_hook.return_value.create_empty_dataset.side_effect =
Conflict("any")
- else:
- mock_hook.return_value.create_empty_dataset.side_effect = None
- if expected_error is not None:
- with pytest.raises(expected_error):
+
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
+ def test_create_empty_dataset(self, mock_hook, caplog, if_exists,
is_conflict, expected_error, log_msg):
+ operator = BigQueryCreateEmptyDatasetOperator(
+ task_id=TASK_ID,
+ dataset_id=TEST_DATASET,
+ project_id=TEST_GCP_PROJECT_ID,
+ location=TEST_DATASET_LOCATION,
+ if_exists=if_exists,
+ )
+ if is_conflict:
+ mock_hook.return_value.create_empty_dataset.side_effect =
Conflict("any")
+ else:
+ mock_hook.return_value.create_empty_dataset.side_effect = None
+ if expected_error is not None:
+ with pytest.raises(expected_error):
+ operator.execute(context=MagicMock())
+ else:
operator.execute(context=MagicMock())
- else:
- operator.execute(context=MagicMock())
- if log_msg is not None:
- assert log_msg in caplog.text
+ if log_msg is not None:
+ assert log_msg in caplog.text
class TestBigQueryGetDatasetOperator:
@@ -394,7 +387,6 @@ class TestBigQueryUpdateTableOperator:
class TestBigQueryUpdateTableSchemaOperator:
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
def test_execute(self, mock_hook):
-
schema_field_updates = [
{
"name": "emp_name",
@@ -794,7 +786,6 @@ class TestBigQueryOperator:
class TestBigQueryGetDataOperator:
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
def test_execute(self, mock_hook):
-
max_results = 100
selected_fields = "DATE"
operator = BigQueryGetDataOperator(
@@ -816,6 +807,100 @@ class TestBigQueryGetDataOperator:
location=TEST_DATASET_LOCATION,
)
+
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
+ def test_bigquery_get_data_operator_async_with_selected_fields(
+ self, mock_hook, create_task_instance_of_operator
+ ):
+ """
+ Asserts that a task is deferred and a BigQuerygetDataTrigger will be
fired
+ when the BigQueryGetDataOperator is executed with deferrable=True.
+ """
+ job_id = "123456"
+ hash_ = "hash"
+ real_job_id = f"{job_id}_{hash_}"
+
+ mock_hook.return_value.insert_job.return_value =
MagicMock(job_id=real_job_id, error_result=False)
+
+ ti = create_task_instance_of_operator(
+ BigQueryGetDataOperator,
+ dag_id="dag_id",
+ task_id="get_data_from_bq",
+ dataset_id=TEST_DATASET,
+ table_id=TEST_TABLE_ID,
+ max_results=100,
+ selected_fields="value,name",
+ deferrable=True,
+ )
+
+ with pytest.raises(TaskDeferred) as exc:
+ ti.task.execute(MagicMock())
+
+ assert isinstance(
+ exc.value.trigger, BigQueryGetDataTrigger
+ ), "Trigger is not a BigQueryGetDataTrigger"
+
+
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
+ def test_bigquery_get_data_operator_async_without_selected_fields(
+ self, mock_hook, create_task_instance_of_operator
+ ):
+ """
+ Asserts that a task is deferred and a BigQueryGetDataTrigger will be
fired
+ when the BigQueryGetDataOperator is executed with deferrable=True.
+ """
+ job_id = "123456"
+ hash_ = "hash"
+ real_job_id = f"{job_id}_{hash_}"
+
+ mock_hook.return_value.insert_job.return_value =
MagicMock(job_id=real_job_id, error_result=False)
+
+ ti = create_task_instance_of_operator(
+ BigQueryGetDataOperator,
+ dag_id="dag_id",
+ task_id="get_data_from_bq",
+ dataset_id=TEST_DATASET,
+ table_id=TEST_TABLE_ID,
+ max_results=100,
+ deferrable=True,
+ )
+
+ with pytest.raises(TaskDeferred) as exc:
+ ti.task.execute(MagicMock())
+
+ assert isinstance(
+ exc.value.trigger, BigQueryGetDataTrigger
+ ), "Trigger is not a BigQueryGetDataTrigger"
+
+ def test_bigquery_get_data_operator_execute_failure(self):
+ """Tests that an AirflowException is raised in case of error event"""
+
+ operator = BigQueryGetDataOperator(
+ task_id="get_data_from_bq",
+ dataset_id=TEST_DATASET,
+ table_id="any",
+ max_results=100,
+ deferrable=True,
+ )
+
+ with pytest.raises(AirflowException):
+ operator.execute_complete(
+ context=None, event={"status": "error", "message": "test
failure message"}
+ )
+
+ def test_bigquery_get_data_op_execute_complete_with_records(self):
+ """Asserts that exception is raised with correct expected exception
message"""
+
+ operator = BigQueryGetDataOperator(
+ task_id="get_data_from_bq",
+ dataset_id=TEST_DATASET,
+ table_id="any",
+ max_results=100,
+ deferrable=True,
+ )
+
+ with mock.patch.object(operator.log, "info") as mock_log_info:
+ operator.execute_complete(context=None, event={"status":
"success", "records": [20]})
+ mock_log_info.assert_called_with("Total extracted rows: %s", 1)
+
class TestBigQueryTableDeleteOperator:
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
@@ -1188,622 +1273,499 @@ class TestBigQueryInsertJobOperator:
with pytest.raises(AirflowException):
op.execute(context=MagicMock())
+
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
+ def test_bigquery_insert_job_operator_async(self, mock_hook,
create_task_instance_of_operator):
+ """
+ Asserts that a task is deferred and a BigQueryInsertJobTrigger will be
fired
+ when the BigQueryInsertJobOperator is executed with deferrable=True.
+ """
+ job_id = "123456"
+ hash_ = "hash"
+ real_job_id = f"{job_id}_{hash_}"
[email protected]("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
-def test_bigquery_insert_job_operator_async(mock_hook):
- """
- Asserts that a task is deferred and a BigQueryInsertJobTrigger will be
fired
- when the BigQueryInsertJobOperator is executed with deferrable=True.
- """
- job_id = "123456"
- hash_ = "hash"
- real_job_id = f"{job_id}_{hash_}"
-
- configuration = {
- "query": {
- "query": "SELECT * FROM any",
- "useLegacySql": False,
+ configuration = {
+ "query": {
+ "query": "SELECT * FROM any",
+ "useLegacySql": False,
+ }
}
- }
- mock_hook.return_value.insert_job.return_value =
MagicMock(job_id=real_job_id, error_result=False)
-
- op = BigQueryInsertJobOperator(
- task_id="insert_query_job",
- configuration=configuration,
- location=TEST_DATASET_LOCATION,
- job_id=job_id,
- project_id=TEST_GCP_PROJECT_ID,
- deferrable=True,
- )
+ mock_hook.return_value.insert_job.return_value =
MagicMock(job_id=real_job_id, error_result=False)
- with pytest.raises(TaskDeferred) as exc:
- op.execute(create_context(op))
+ ti = create_task_instance_of_operator(
+ BigQueryInsertJobOperator,
+ dag_id="dag_id",
+ task_id="insert_query_job",
+ configuration=configuration,
+ location=TEST_DATASET_LOCATION,
+ job_id=job_id,
+ project_id=TEST_GCP_PROJECT_ID,
+ deferrable=True,
+ )
- assert isinstance(
- exc.value.trigger, BigQueryInsertJobTrigger
- ), "Trigger is not a BigQueryInsertJobTrigger"
+ with pytest.raises(TaskDeferred) as exc:
+ ti.task.execute(MagicMock())
+ assert isinstance(
+ exc.value.trigger, BigQueryInsertJobTrigger
+ ), "Trigger is not a BigQueryInsertJobTrigger"
-def test_bigquery_insert_job_operator_execute_failure():
- """Tests that an AirflowException is raised in case of error event"""
- configuration = {
- "query": {
- "query": "SELECT * FROM any",
- "useLegacySql": False,
+ def test_bigquery_insert_job_operator_execute_failure(self):
+ """Tests that an AirflowException is raised in case of error event"""
+ configuration = {
+ "query": {
+ "query": "SELECT * FROM any",
+ "useLegacySql": False,
+ }
}
- }
- job_id = "123456"
-
- operator = BigQueryInsertJobOperator(
- task_id="insert_query_job",
- configuration=configuration,
- location=TEST_DATASET_LOCATION,
- job_id=job_id,
- project_id=TEST_GCP_PROJECT_ID,
- deferrable=True,
- )
-
- with pytest.raises(AirflowException):
- operator.execute_complete(context=None, event={"status": "error",
"message": "test failure message"})
-
+ job_id = "123456"
-def create_context(task):
- dag = DAG(dag_id="dag")
- logical_date = datetime(2022, 1, 1, 0, 0, 0)
- dag_run = DagRun(
- dag_id=dag.dag_id,
- execution_date=logical_date,
- run_id=DagRun.generate_run_id(DagRunType.MANUAL, logical_date),
- )
- task_instance = TaskInstance(task=task)
- task_instance.dag_run = dag_run
- task_instance.dag_id = dag.dag_id
- task_instance.xcom_push = mock.Mock()
- return {
- "dag": dag,
- "run_id": dag_run.run_id,
- "task": task,
- "ti": task_instance,
- "task_instance": task_instance,
- "logical_date": logical_date,
- }
-
-
-def test_bigquery_insert_job_operator_execute_complete():
- """Asserts that logging occurs as expected"""
- configuration = {
- "query": {
- "query": "SELECT * FROM any",
- "useLegacySql": False,
- }
- }
- job_id = "123456"
-
- operator = BigQueryInsertJobOperator(
- task_id="insert_query_job",
- configuration=configuration,
- location=TEST_DATASET_LOCATION,
- job_id=job_id,
- project_id=TEST_GCP_PROJECT_ID,
- deferrable=True,
- )
- with mock.patch.object(operator.log, "info") as mock_log_info:
- operator.execute_complete(
- context=create_context(operator),
- event={"status": "success", "message": "Job completed", "job_id":
job_id},
+ operator = BigQueryInsertJobOperator(
+ task_id="insert_query_job",
+ configuration=configuration,
+ location=TEST_DATASET_LOCATION,
+ job_id=job_id,
+ project_id=TEST_GCP_PROJECT_ID,
+ deferrable=True,
)
- mock_log_info.assert_called_with("%s completed with response %s ",
"insert_query_job", "Job completed")
+ with pytest.raises(AirflowException):
+ operator.execute_complete(
+ context=None, event={"status": "error", "message": "test
failure message"}
+ )
[email protected]("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
-def test_bigquery_insert_job_operator_with_job_id_generate(mock_hook):
- job_id = "123456"
- hash_ = "hash"
- real_job_id = f"{job_id}_{hash_}"
-
- configuration = {
- "query": {
- "query": "SELECT * FROM any",
- "useLegacySql": False,
+ def test_bigquery_insert_job_operator_execute_complete(self,
create_task_instance_of_operator):
+ """Asserts that logging occurs as expected"""
+ configuration = {
+ "query": {
+ "query": "SELECT * FROM any",
+ "useLegacySql": False,
+ }
}
- }
-
- mock_hook.return_value.insert_job.side_effect = Conflict("any")
- job = MagicMock(
- job_id=real_job_id,
- error_result=False,
- state="PENDING",
- done=lambda: False,
- )
- mock_hook.return_value.get_job.return_value = job
-
- op = BigQueryInsertJobOperator(
- task_id="insert_query_job",
- configuration=configuration,
- location=TEST_DATASET_LOCATION,
- job_id=job_id,
- project_id=TEST_GCP_PROJECT_ID,
- reattach_states={"PENDING"},
- deferrable=True,
- )
-
- with pytest.raises(TaskDeferred):
- op.execute(create_context(op))
-
- mock_hook.return_value.generate_job_id.assert_called_once_with(
- job_id=job_id,
- dag_id="adhoc_airflow",
- task_id="insert_query_job",
- logical_date=datetime(2022, 1, 1, 0, 0),
- configuration=configuration,
- force_rerun=True,
- )
+ job_id = "123456"
+ ti = create_task_instance_of_operator(
+ BigQueryInsertJobOperator,
+ dag_id="dag_id",
+ task_id="insert_query_job",
+ configuration=configuration,
+ location=TEST_DATASET_LOCATION,
+ job_id=job_id,
+ project_id=TEST_GCP_PROJECT_ID,
+ deferrable=True,
+ )
+ operator = ti.task
+ with mock.patch.object(operator.log, "info") as mock_log_info:
+ operator.execute_complete(
+ context=MagicMock(),
+ event={"status": "success", "message": "Job completed",
"job_id": job_id},
+ )
+ mock_log_info.assert_called_with(
+ "%s completed with response %s ", "insert_query_job", "Job
completed"
+ )
[email protected]("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
-def test_execute_reattach(mock_hook):
- job_id = "123456"
- hash_ = "hash"
- real_job_id = f"{job_id}_{hash_}"
- mock_hook.return_value.generate_job_id.return_value = f"{job_id}_{hash_}"
+
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
+ def test_bigquery_insert_job_operator_with_job_id_generate(
+ self, mock_hook, create_task_instance_of_operator
+ ):
+ job_id = "123456"
+ hash_ = "hash"
+ real_job_id = f"{job_id}_{hash_}"
- configuration = {
- "query": {
- "query": "SELECT * FROM any",
- "useLegacySql": False,
+ configuration = {
+ "query": {
+ "query": "SELECT * FROM any",
+ "useLegacySql": False,
+ }
}
- }
-
- mock_hook.return_value.insert_job.side_effect = Conflict("any")
- job = MagicMock(
- job_id=real_job_id,
- error_result=False,
- state="PENDING",
- done=lambda: False,
- )
- mock_hook.return_value.get_job.return_value = job
-
- op = BigQueryInsertJobOperator(
- task_id="insert_query_job",
- configuration=configuration,
- location=TEST_DATASET_LOCATION,
- job_id=job_id,
- project_id=TEST_GCP_PROJECT_ID,
- reattach_states={"PENDING"},
- deferrable=True,
- )
- with pytest.raises(TaskDeferred):
- op.execute(create_context(op))
+ mock_hook.return_value.insert_job.side_effect = Conflict("any")
+ job = MagicMock(
+ job_id=real_job_id,
+ error_result=False,
+ state="PENDING",
+ done=lambda: False,
+ )
+ mock_hook.return_value.get_job.return_value = job
- mock_hook.return_value.get_job.assert_called_once_with(
- location=TEST_DATASET_LOCATION,
- job_id=real_job_id,
- project_id=TEST_GCP_PROJECT_ID,
- )
+ ti = create_task_instance_of_operator(
+ BigQueryInsertJobOperator,
+ dag_id="adhoc_airflow",
+ task_id="insert_query_job",
+ configuration=configuration,
+ location=TEST_DATASET_LOCATION,
+ job_id=job_id,
+ project_id=TEST_GCP_PROJECT_ID,
+ reattach_states={"PENDING"},
+ deferrable=True,
+ )
- job._begin.assert_called_once_with()
+ with pytest.raises(TaskDeferred):
+ ti.task.execute(MagicMock())
+ mock_hook.return_value.generate_job_id.assert_called_once_with(
+ job_id=job_id,
+ dag_id="adhoc_airflow",
+ task_id="insert_query_job",
+ logical_date=ANY,
+ configuration=configuration,
+ force_rerun=True,
+ )
[email protected]("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
-def test_execute_force_rerun_async(mock_hook):
- job_id = "123456"
- hash_ = "hash"
- real_job_id = f"{job_id}_{hash_}"
- mock_hook.return_value.generate_job_id.return_value = f"{job_id}_{hash_}"
+
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
+ def test_execute_force_rerun_async(self, mock_hook,
create_task_instance_of_operator):
+ job_id = "123456"
+ hash_ = "hash"
+ real_job_id = f"{job_id}_{hash_}"
+ mock_hook.return_value.generate_job_id.return_value =
f"{job_id}_{hash_}"
- configuration = {
- "query": {
- "query": "SELECT * FROM any",
- "useLegacySql": False,
+ configuration = {
+ "query": {
+ "query": "SELECT * FROM any",
+ "useLegacySql": False,
+ }
}
- }
-
- mock_hook.return_value.insert_job.side_effect = Conflict("any")
- job = MagicMock(
- job_id=real_job_id,
- error_result=False,
- state="DONE",
- done=lambda: False,
- )
- mock_hook.return_value.get_job.return_value = job
-
- op = BigQueryInsertJobOperator(
- task_id="insert_query_job",
- configuration=configuration,
- location=TEST_DATASET_LOCATION,
- job_id=job_id,
- project_id=TEST_GCP_PROJECT_ID,
- reattach_states={"PENDING"},
- deferrable=True,
- )
-
- with pytest.raises(AirflowException) as exc:
- op.execute(create_context(op))
-
- expected_exception_msg = (
- f"Job with id: {real_job_id} already exists and is in {job.state}
state. "
- f"If you want to force rerun it consider setting `force_rerun=True`."
- f"Or, if you want to reattach in this scenario add {job.state} to
`reattach_states`"
- )
-
- assert str(exc.value) == expected_exception_msg
-
- mock_hook.return_value.get_job.assert_called_once_with(
- location=TEST_DATASET_LOCATION,
- job_id=real_job_id,
- project_id=TEST_GCP_PROJECT_ID,
- )
-
-
[email protected]("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
-def test_bigquery_check_operator_async(mock_hook):
- """
- Asserts that a task is deferred and a BigQueryCheckTrigger will be fired
- when the BigQueryCheckOperator is executed with deferrable=True.
- """
- job_id = "123456"
- hash_ = "hash"
- real_job_id = f"{job_id}_{hash_}"
-
- mock_hook.return_value.insert_job.return_value =
MagicMock(job_id=real_job_id, error_result=False)
-
- op = BigQueryCheckOperator(
- task_id="bq_check_operator_job",
- sql="SELECT * FROM any",
- location=TEST_DATASET_LOCATION,
- deferrable=True,
- )
-
- with pytest.raises(TaskDeferred) as exc:
- op.execute(create_context(op))
-
- assert isinstance(exc.value.trigger, BigQueryCheckTrigger), "Trigger is
not a BigQueryCheckTrigger"
-
-
-def test_bigquery_check_operator_execute_failure():
- """Tests that an AirflowException is raised in case of error event"""
-
- operator = BigQueryCheckOperator(
- task_id="bq_check_operator_execute_failure",
- sql="SELECT * FROM any",
- location=TEST_DATASET_LOCATION,
- deferrable=True,
- )
-
- with pytest.raises(AirflowException):
- operator.execute_complete(context=None, event={"status": "error",
"message": "test failure message"})
-
-def test_bigquery_check_op_execute_complete_with_no_records():
- """Asserts that exception is raised with correct expected exception
message"""
-
- operator = BigQueryCheckOperator(
- task_id="bq_check_operator_execute_complete",
- sql="SELECT * FROM any",
- location=TEST_DATASET_LOCATION,
- deferrable=True,
- )
-
- with pytest.raises(AirflowException) as exc:
- operator.execute_complete(context=None, event={"status": "success",
"records": None})
-
- expected_exception_msg = "The query returned empty results"
-
- assert str(exc.value) == expected_exception_msg
-
-
-def test_bigquery_check_op_execute_complete_with_non_boolean_records():
- """Executing a sql which returns a non-boolean value should raise
exception"""
-
- test_sql = "SELECT * FROM any"
+ mock_hook.return_value.insert_job.side_effect = Conflict("any")
+ job = MagicMock(
+ job_id=real_job_id,
+ error_result=False,
+ state="DONE",
+ done=lambda: False,
+ )
+ mock_hook.return_value.get_job.return_value = job
- operator = BigQueryCheckOperator(
- task_id="bq_check_operator_execute_complete",
- sql=test_sql,
- location=TEST_DATASET_LOCATION,
- deferrable=True,
- )
+ ti = create_task_instance_of_operator(
+ BigQueryInsertJobOperator,
+ dag_id="dag_id",
+ task_id="insert_query_job",
+ configuration=configuration,
+ location=TEST_DATASET_LOCATION,
+ job_id=job_id,
+ project_id=TEST_GCP_PROJECT_ID,
+ reattach_states={"PENDING"},
+ deferrable=True,
+ )
- expected_exception_msg = f"Test
failed.\nQuery:\n{test_sql}\nResults:\n{[20, False]!s}"
+ with pytest.raises(AirflowException) as exc:
+ ti.task.execute(MagicMock())
- with pytest.raises(AirflowException) as exc:
- operator.execute_complete(context=None, event={"status": "success",
"records": [20, False]})
+ expected_exception_msg = (
+ f"Job with id: {real_job_id} already exists and is in {job.state}
state. "
+ f"If you want to force rerun it consider setting
`force_rerun=True`."
+ f"Or, if you want to reattach in this scenario add {job.state} to
`reattach_states`"
+ )
- assert str(exc.value) == expected_exception_msg
+ assert str(exc.value) == expected_exception_msg
+ mock_hook.return_value.get_job.assert_called_once_with(
+ location=TEST_DATASET_LOCATION,
+ job_id=real_job_id,
+ project_id=TEST_GCP_PROJECT_ID,
+ )
-def test_bigquery_check_operator_execute_complete():
- """Asserts that logging occurs as expected"""
- operator = BigQueryCheckOperator(
- task_id="bq_check_operator_execute_complete",
- sql="SELECT * FROM any",
- location=TEST_DATASET_LOCATION,
- deferrable=True,
- )
+class TestBigQueryIntervalCheckOperator:
+ def test_bigquery_interval_check_operator_execute_complete(self):
+ """Asserts that logging occurs as expected"""
- with mock.patch.object(operator.log, "info") as mock_log_info:
- operator.execute_complete(context=None, event={"status": "success",
"records": [20]})
- mock_log_info.assert_called_with("Success.")
+ operator = BigQueryIntervalCheckOperator(
+ task_id="bq_interval_check_operator_execute_complete",
+ table="test_table",
+ metrics_thresholds={"COUNT(*)": 1.5},
+ location=TEST_DATASET_LOCATION,
+ deferrable=True,
+ )
+ with mock.patch.object(operator.log, "info") as mock_log_info:
+ operator.execute_complete(context=None, event={"status":
"success", "message": "Job completed"})
+ mock_log_info.assert_called_with(
+ "%s completed with response %s ",
"bq_interval_check_operator_execute_complete", "Job completed"
+ )
-def test_bigquery_interval_check_operator_execute_complete():
- """Asserts that logging occurs as expected"""
+ def test_bigquery_interval_check_operator_execute_failure(self):
+ """Tests that an AirflowException is raised in case of error event"""
- operator = BigQueryIntervalCheckOperator(
- task_id="bq_interval_check_operator_execute_complete",
- table="test_table",
- metrics_thresholds={"COUNT(*)": 1.5},
- location=TEST_DATASET_LOCATION,
- deferrable=True,
- )
+ operator = BigQueryIntervalCheckOperator(
+ task_id="bq_interval_check_operator_execute_complete",
+ table="test_table",
+ metrics_thresholds={"COUNT(*)": 1.5},
+ location=TEST_DATASET_LOCATION,
+ deferrable=True,
+ )
- with mock.patch.object(operator.log, "info") as mock_log_info:
- operator.execute_complete(context=None, event={"status": "success",
"message": "Job completed"})
- mock_log_info.assert_called_with(
- "%s completed with response %s ",
"bq_interval_check_operator_execute_complete", "Job completed"
- )
+ with pytest.raises(AirflowException):
+ operator.execute_complete(
+ context=None, event={"status": "error", "message": "test
failure message"}
+ )
+
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
+ def test_bigquery_interval_check_operator_async(self, mock_hook,
create_task_instance_of_operator):
+ """
+ Asserts that a task is deferred and a BigQueryIntervalCheckTrigger
will be fired
+ when the BigQueryIntervalCheckOperator is executed with
deferrable=True.
+ """
+ job_id = "123456"
+ hash_ = "hash"
+ real_job_id = f"{job_id}_{hash_}"
-def test_bigquery_interval_check_operator_execute_failure():
- """Tests that an AirflowException is raised in case of error event"""
+ mock_hook.return_value.insert_job.return_value =
MagicMock(job_id=real_job_id, error_result=False)
- operator = BigQueryIntervalCheckOperator(
- task_id="bq_interval_check_operator_execute_complete",
- table="test_table",
- metrics_thresholds={"COUNT(*)": 1.5},
- location=TEST_DATASET_LOCATION,
- deferrable=True,
- )
+ ti = create_task_instance_of_operator(
+ BigQueryIntervalCheckOperator,
+ dag_id="dag_id",
+ task_id="bq_interval_check_operator_execute_complete",
+ table="test_table",
+ metrics_thresholds={"COUNT(*)": 1.5},
+ location=TEST_DATASET_LOCATION,
+ deferrable=True,
+ )
- with pytest.raises(AirflowException):
- operator.execute_complete(context=None, event={"status": "error",
"message": "test failure message"})
+ with pytest.raises(TaskDeferred) as exc:
+ ti.task.execute(MagicMock())
+ assert isinstance(
+ exc.value.trigger, BigQueryIntervalCheckTrigger
+ ), "Trigger is not a BigQueryIntervalCheckTrigger"
[email protected]("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
-def test_bigquery_interval_check_operator_async(mock_hook):
- """
- Asserts that a task is deferred and a BigQueryIntervalCheckTrigger will be
fired
- when the BigQueryIntervalCheckOperator is executed with deferrable=True.
- """
- job_id = "123456"
- hash_ = "hash"
- real_job_id = f"{job_id}_{hash_}"
- mock_hook.return_value.insert_job.return_value =
MagicMock(job_id=real_job_id, error_result=False)
+class TestBigQueryCheckOperator:
+
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
+ def test_bigquery_check_operator_async(self, mock_hook,
create_task_instance_of_operator):
+ """
+ Asserts that a task is deferred and a BigQueryCheckTrigger will be
fired
+ when the BigQueryCheckOperator is executed with deferrable=True.
+ """
+ job_id = "123456"
+ hash_ = "hash"
+ real_job_id = f"{job_id}_{hash_}"
- op = BigQueryIntervalCheckOperator(
- task_id="bq_interval_check_operator_execute_complete",
- table="test_table",
- metrics_thresholds={"COUNT(*)": 1.5},
- location=TEST_DATASET_LOCATION,
- deferrable=True,
- )
+ mock_hook.return_value.insert_job.return_value =
MagicMock(job_id=real_job_id, error_result=False)
- with pytest.raises(TaskDeferred) as exc:
- op.execute(create_context(op))
-
- assert isinstance(
- exc.value.trigger, BigQueryIntervalCheckTrigger
- ), "Trigger is not a BigQueryIntervalCheckTrigger"
-
-
[email protected]("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
-def test_bigquery_get_data_operator_async_with_selected_fields(mock_hook):
- """
- Asserts that a task is deferred and a BigQuerygetDataTrigger will be fired
- when the BigQueryGetDataOperator is executed with deferrable=True.
- """
- job_id = "123456"
- hash_ = "hash"
- real_job_id = f"{job_id}_{hash_}"
-
- mock_hook.return_value.insert_job.return_value =
MagicMock(job_id=real_job_id, error_result=False)
-
- op = BigQueryGetDataOperator(
- task_id="get_data_from_bq",
- dataset_id=TEST_DATASET,
- table_id=TEST_TABLE_ID,
- max_results=100,
- selected_fields="value,name",
- deferrable=True,
- )
+ ti = create_task_instance_of_operator(
+ BigQueryCheckOperator,
+ dag_id="dag_id",
+ task_id="bq_check_operator_job",
+ sql="SELECT * FROM any",
+ location=TEST_DATASET_LOCATION,
+ deferrable=True,
+ )
- with pytest.raises(TaskDeferred) as exc:
- op.execute(create_context(op))
+ with pytest.raises(TaskDeferred) as exc:
+ ti.task.execute(MagicMock())
- assert isinstance(exc.value.trigger, BigQueryGetDataTrigger), "Trigger is
not a BigQueryGetDataTrigger"
+ assert isinstance(exc.value.trigger, BigQueryCheckTrigger), "Trigger
is not a BigQueryCheckTrigger"
+ def test_bigquery_check_operator_execute_failure(self):
+ """Tests that an AirflowException is raised in case of error event"""
[email protected]("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
-def test_bigquery_get_data_operator_async_without_selected_fields(mock_hook):
- """
- Asserts that a task is deferred and a BigQueryGetDataTrigger will be fired
- when the BigQueryGetDataOperator is executed with deferrable=True.
- """
- job_id = "123456"
- hash_ = "hash"
- real_job_id = f"{job_id}_{hash_}"
+ operator = BigQueryCheckOperator(
+ task_id="bq_check_operator_execute_failure",
+ sql="SELECT * FROM any",
+ location=TEST_DATASET_LOCATION,
+ deferrable=True,
+ )
- mock_hook.return_value.insert_job.return_value =
MagicMock(job_id=real_job_id, error_result=False)
+ with pytest.raises(AirflowException):
+ operator.execute_complete(
+ context=None, event={"status": "error", "message": "test
failure message"}
+ )
- op = BigQueryGetDataOperator(
- task_id="get_data_from_bq",
- dataset_id=TEST_DATASET,
- table_id=TEST_TABLE_ID,
- max_results=100,
- deferrable=True,
- )
+ def test_bigquery_check_op_execute_complete_with_no_records(self):
+ """Asserts that exception is raised with correct expected exception
message"""
- with pytest.raises(TaskDeferred) as exc:
- op.execute(create_context(op))
+ operator = BigQueryCheckOperator(
+ task_id="bq_check_operator_execute_complete",
+ sql="SELECT * FROM any",
+ location=TEST_DATASET_LOCATION,
+ deferrable=True,
+ )
- assert isinstance(exc.value.trigger, BigQueryGetDataTrigger), "Trigger is
not a BigQueryGetDataTrigger"
+ with pytest.raises(AirflowException) as exc:
+ operator.execute_complete(context=None, event={"status":
"success", "records": None})
+ expected_exception_msg = "The query returned empty results"
-def test_bigquery_get_data_operator_execute_failure():
- """Tests that an AirflowException is raised in case of error event"""
+ assert str(exc.value) == expected_exception_msg
- operator = BigQueryGetDataOperator(
- task_id="get_data_from_bq",
- dataset_id=TEST_DATASET,
- table_id="any",
- max_results=100,
- deferrable=True,
- )
+ def test_bigquery_check_op_execute_complete_with_non_boolean_records(self):
+ """Executing a sql which returns a non-boolean value should raise
exception"""
- with pytest.raises(AirflowException):
- operator.execute_complete(context=None, event={"status": "error",
"message": "test failure message"})
+ test_sql = "SELECT * FROM any"
+ operator = BigQueryCheckOperator(
+ task_id="bq_check_operator_execute_complete",
+ sql=test_sql,
+ location=TEST_DATASET_LOCATION,
+ deferrable=True,
+ )
-def test_bigquery_get_data_op_execute_complete_with_records():
- """Asserts that exception is raised with correct expected exception
message"""
+ expected_exception_msg = f"Test
failed.\nQuery:\n{test_sql}\nResults:\n{[20, False]!s}"
- operator = BigQueryGetDataOperator(
- task_id="get_data_from_bq",
- dataset_id=TEST_DATASET,
- table_id="any",
- max_results=100,
- deferrable=True,
- )
+ with pytest.raises(AirflowException) as exc:
+ operator.execute_complete(context=None, event={"status":
"success", "records": [20, False]})
- with mock.patch.object(operator.log, "info") as mock_log_info:
- operator.execute_complete(context=None, event={"status": "success",
"records": [20]})
- mock_log_info.assert_called_with("Total extracted rows: %s", 1)
+ assert str(exc.value) == expected_exception_msg
+ def test_bigquery_check_operator_execute_complete(self):
+ """Asserts that logging occurs as expected"""
-def _get_value_check_async_operator(use_legacy_sql: bool = False):
- query = "SELECT COUNT(*) FROM Any"
- pass_val = 2
+ operator = BigQueryCheckOperator(
+ task_id="bq_check_operator_execute_complete",
+ sql="SELECT * FROM any",
+ location=TEST_DATASET_LOCATION,
+ deferrable=True,
+ )
- return BigQueryValueCheckOperator(
- task_id="check_value",
- sql=query,
- pass_value=pass_val,
- use_legacy_sql=use_legacy_sql,
- deferrable=True,
- )
+ with mock.patch.object(operator.log, "info") as mock_log_info:
+ operator.execute_complete(context=None, event={"status":
"success", "records": [20]})
+ mock_log_info.assert_called_with("Success.")
[email protected]("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
-def test_bigquery_value_check_async(mock_hook):
- """
- Asserts that a task is deferred and a BigQueryValueCheckTrigger will be
fired
- when the BigQueryValueCheckOperatorAsync is executed.
- """
- operator = _get_value_check_async_operator(True)
- job_id = "123456"
- hash_ = "hash"
- real_job_id = f"{job_id}_{hash_}"
- mock_hook.return_value.insert_job.return_value =
MagicMock(job_id=real_job_id, error_result=False)
- with pytest.raises(TaskDeferred) as exc:
- operator.execute(create_context(operator))
-
- assert isinstance(
- exc.value.trigger, BigQueryValueCheckTrigger
- ), "Trigger is not a BigQueryValueCheckTrigger"
-
-
-def test_bigquery_value_check_operator_execute_complete_success():
- """Tests response message in case of success event"""
- operator = _get_value_check_async_operator()
-
- assert (
- operator.execute_complete(context=None, event={"status": "success",
"message": "Job completed!"})
- is None
+class TestBigQueryValueCheckOperator:
+
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
+ def test_bigquery_value_check_async(self, mock_hook,
create_task_instance_of_operator):
+ """
+ Asserts that a task is deferred and a BigQueryValueCheckTrigger will
be fired
+ when the BigQueryValueCheckOperatorAsync is executed.
+ """
+ ti = create_task_instance_of_operator(
+ BigQueryValueCheckOperator,
+ dag_id="dag_id",
+ task_id="check_value",
+ sql="SELECT COUNT(*) FROM Any",
+ pass_value=2,
+ use_legacy_sql=True,
+ deferrable=True,
+ )
+ job_id = "123456"
+ hash_ = "hash"
+ real_job_id = f"{job_id}_{hash_}"
+ mock_hook.return_value.insert_job.return_value =
MagicMock(job_id=real_job_id, error_result=False)
+ with pytest.raises(TaskDeferred) as exc:
+ ti.task.execute(MagicMock())
+
+ assert isinstance(
+ exc.value.trigger, BigQueryValueCheckTrigger
+ ), "Trigger is not a BigQueryValueCheckTrigger"
+
+ @pytest.mark.parametrize(
+ "kwargs, expected",
+ [
+ ({"sql": "SELECT COUNT(*) from Any"}, "missing keyword argument
'pass_value'"),
+ ({"pass_value": "Any"}, "missing keyword argument 'sql'"),
+ ],
)
+ def test_bigquery_value_check_missing_param(self, kwargs, expected):
+ """Assert the exception if require param not pass to
BigQueryValueCheckOperatorAsync operator"""
+ with pytest.raises(AirflowException) as missing_param:
+ BigQueryValueCheckOperator(deferrable=True, **kwargs)
+ assert missing_param.value.args[0] == expected
+
+ def test_bigquery_value_check_empty(self):
+ """Assert the exception if require param not pass to
BigQueryValueCheckOperatorAsync operator"""
+ expected, expected1 = (
+ "missing keyword arguments 'sql', 'pass_value'",
+ "missing keyword arguments 'pass_value', 'sql'",
+ )
+ with pytest.raises(AirflowException) as missing_param:
+ BigQueryValueCheckOperator(deferrable=True, kwargs={})
+ assert (missing_param.value.args[0] == expected) or
(missing_param.value.args[0] == expected1)
+
+ def test_bigquery_value_check_operator_execute_complete_success(self):
+ """Tests response message in case of success event"""
+ operator = BigQueryValueCheckOperator(
+ task_id="check_value",
+ sql="SELECT COUNT(*) FROM Any",
+ pass_value=2,
+ use_legacy_sql=False,
+ deferrable=True,
+ )
+
+ assert (
+ operator.execute_complete(context=None, event={"status":
"success", "message": "Job completed!"})
+ is None
+ )
+
+ def test_bigquery_value_check_operator_execute_complete_failure(self):
+ """Tests that an AirflowException is raised in case of error event"""
+ operator = BigQueryValueCheckOperator(
+ task_id="check_value",
+ sql="SELECT COUNT(*) FROM Any",
+ pass_value=2,
+ use_legacy_sql=False,
+ deferrable=True,
+ )
-
-def test_bigquery_value_check_operator_execute_complete_failure():
- """Tests that an AirflowException is raised in case of error event"""
- operator = _get_value_check_async_operator()
-
- with pytest.raises(AirflowException):
- operator.execute_complete(context=None, event={"status": "error",
"message": "test failure message"})
-
-
[email protected](
- "kwargs, expected",
- [
- ({"sql": "SELECT COUNT(*) from Any"}, "missing keyword argument
'pass_value'"),
- ({"pass_value": "Any"}, "missing keyword argument 'sql'"),
- ],
-)
-def test_bigquery_value_check_missing_param(kwargs, expected):
- """Assert the exception if require param not pass to
BigQueryValueCheckOperatorAsync operator"""
- with pytest.raises(AirflowException) as missing_param:
- BigQueryValueCheckOperator(deferrable=True, **kwargs)
- assert missing_param.value.args[0] == expected
-
-
-def test_bigquery_value_check_empty():
- """Assert the exception if require param not pass to
BigQueryValueCheckOperatorAsync operator"""
- expected, expected1 = (
- "missing keyword arguments 'sql', 'pass_value'",
- "missing keyword arguments 'pass_value', 'sql'",
+ with pytest.raises(AirflowException):
+ operator.execute_complete(
+ context=None, event={"status": "error", "message": "test
failure message"}
+ )
+
+
+class TestBigQueryColumnCheckOperator:
+ @pytest.mark.parametrize(
+ "check_type, check_value, check_result",
+ [
+ ("equal_to", 0, 0),
+ ("greater_than", 0, 1),
+ ("less_than", 0, -1),
+ ("geq_to", 0, 1),
+ ("geq_to", 0, 0),
+ ("leq_to", 0, 0),
+ ("leq_to", 0, -1),
+ ],
)
- with pytest.raises(AirflowException) as missing_param:
- BigQueryValueCheckOperator(deferrable=True, kwargs={})
- assert (missing_param.value.args[0] == expected) or
(missing_param.value.args[0] == expected1)
-
+
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
+ @mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryJob")
+ def test_bigquery_column_check_operator_succeeds(
+ self, mock_job, mock_hook, check_type, check_value, check_result,
create_task_instance_of_operator
+ ):
+ mock_job.result.return_value.to_dataframe.return_value = pd.DataFrame(
+ {"col_name": ["col1"], "check_type": ["min"], "check_result":
[check_result]}
+ )
+ mock_hook.return_value.insert_job.return_value = mock_job
[email protected](
- "check_type, check_value, check_result",
- [
- ("equal_to", 0, 0),
- ("greater_than", 0, 1),
- ("less_than", 0, -1),
- ("geq_to", 0, 1),
- ("geq_to", 0, 0),
- ("leq_to", 0, 0),
- ("leq_to", 0, -1),
- ],
-)
[email protected]("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
[email protected]("airflow.providers.google.cloud.hooks.bigquery.BigQueryJob")
-def test_bigquery_column_check_operator_succeeds(mock_job, mock_hook,
check_type, check_value, check_result):
- mock_job.result.return_value.to_dataframe.return_value = pd.DataFrame(
- {"col_name": ["col1"], "check_type": ["min"], "check_result":
[check_result]}
- )
- mock_hook.return_value.insert_job.return_value = mock_job
-
- op = BigQueryColumnCheckOperator(
- task_id="check_column_succeeds",
- table=TEST_TABLE_ID,
- use_legacy_sql=False,
- column_mapping={
- "col1": {"min": {check_type: check_value}},
- },
+ ti = create_task_instance_of_operator(
+ BigQueryColumnCheckOperator,
+ dag_id="dag_id",
+ task_id="check_column_succeeds",
+ table=TEST_TABLE_ID,
+ use_legacy_sql=False,
+ column_mapping={
+ "col1": {"min": {check_type: check_value}},
+ },
+ )
+ ti.task.execute(MagicMock())
+
+ @pytest.mark.parametrize(
+ "check_type, check_value, check_result",
+ [
+ ("equal_to", 0, 1),
+ ("greater_than", 0, -1),
+ ("less_than", 0, 1),
+ ("geq_to", 0, -1),
+ ("leq_to", 0, 1),
+ ],
)
- op.execute(create_context(op))
-
+
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
+ @mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryJob")
+ def test_bigquery_column_check_operator_fails(
+ self, mock_job, mock_hook, check_type, check_value, check_result,
create_task_instance_of_operator
+ ):
+ mock_job.result.return_value.to_dataframe.return_value = pd.DataFrame(
+ {"col_name": ["col1"], "check_type": ["min"], "check_result":
[check_result]}
+ )
+ mock_hook.return_value.insert_job.return_value = mock_job
[email protected](
- "check_type, check_value, check_result",
- [
- ("equal_to", 0, 1),
- ("greater_than", 0, -1),
- ("less_than", 0, 1),
- ("geq_to", 0, -1),
- ("leq_to", 0, 1),
- ],
-)
[email protected]("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
[email protected]("airflow.providers.google.cloud.hooks.bigquery.BigQueryJob")
-def test_bigquery_column_check_operator_fails(mock_job, mock_hook, check_type,
check_value, check_result):
- mock_job.result.return_value.to_dataframe.return_value = pd.DataFrame(
- {"col_name": ["col1"], "check_type": ["min"], "check_result":
[check_result]}
- )
- mock_hook.return_value.insert_job.return_value = mock_job
-
- op = BigQueryColumnCheckOperator(
- task_id="check_column_fails",
- table=TEST_TABLE_ID,
- use_legacy_sql=False,
- column_mapping={
- "col1": {"min": {check_type: check_value}},
- },
- )
- with pytest.raises(AirflowException):
- op.execute(create_context(op))
+ ti = create_task_instance_of_operator(
+ BigQueryColumnCheckOperator,
+ dag_id="dag_id",
+ task_id="check_column_fails",
+ table=TEST_TABLE_ID,
+ use_legacy_sql=False,
+ column_mapping={
+ "col1": {"min": {check_type: check_value}},
+ },
+ )
+ with pytest.raises(AirflowException):
+ ti.task.execute(MagicMock())