This is an automated email from the ASF dual-hosted git repository.
taragolis pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 4358511fce Add `AirflowInternalRuntimeError` for raise "non catchable"
errors (#38778)
4358511fce is described below
commit 4358511fce2eab6d3f97edb1c95d09f4f5a49376
Author: Andrey Anshin <[email protected]>
AuthorDate: Sun Apr 7 13:23:27 2024 +0400
Add `AirflowInternalRuntimeError` for raise "non catchable" errors (#38778)
* Add `AirflowInternalRuntimeError` for raise non catchable errors
* Fixup non-db tests or move it as db tests
* Fixup mysql test
---
airflow/exceptions.py | 10 ++++++
airflow/settings.py | 4 +--
tests/models/test_dagrun.py | 39 ++++++++++++++--------
.../amazon/aws/auth_manager/avp/test_facade.py | 1 +
tests/providers/amazon/aws/fs/test_s3.py | 7 ++++
tests/providers/amazon/aws/hooks/test_base_aws.py | 2 ++
tests/providers/amazon/aws/hooks/test_emr.py | 5 ++-
tests/providers/amazon/aws/operators/test_ecs.py | 1 +
tests/providers/amazon/conftest.py | 8 +++++
.../apache/hive/transfers/test_s3_to_hive.py | 1 +
.../cncf/kubernetes/operators/test_job.py | 14 +++++++-
.../cncf/kubernetes/operators/test_resource.py | 1 +
.../google/cloud/operators/test_datapipeline.py | 3 ++
.../cloud/operators/test_kubernetes_engine.py | 1 +
.../google/cloud/operators/test_vertex_ai.py | 1 +
tests/providers/mysql/hooks/test_mysql.py | 5 +--
tests/providers/samba/hooks/test_samba.py | 7 ++--
tests/providers/slack/hooks/test_slack_webhook.py | 12 +++++--
tests/serialization/test_dag_serialization.py | 19 ++++++-----
tests/utils/test_cli_util.py | 4 +++
tests/utils/test_email.py | 1 +
21 files changed, 112 insertions(+), 34 deletions(-)
diff --git a/airflow/exceptions.py b/airflow/exceptions.py
index ea2f5b0140..c497367989 100644
--- a/airflow/exceptions.py
+++ b/airflow/exceptions.py
@@ -107,6 +107,16 @@ class
AirflowOptionalProviderFeatureException(AirflowException):
"""Raise by providers when imports are missing for optional provider
features."""
+class AirflowInternalRuntimeError(BaseException):
+ """
+ Airflow Internal runtime error.
+
+ Indicates that something really terrible happens during the Airflow
execution.
+
+ :meta private:
+ """
+
+
class XComNotFound(AirflowException):
"""Raise when an XCom reference is being resolved against a non-existent
XCom."""
diff --git a/airflow/settings.py b/airflow/settings.py
index 2a2b1455bc..7ead89f34e 100644
--- a/airflow/settings.py
+++ b/airflow/settings.py
@@ -33,7 +33,7 @@ from sqlalchemy.pool import NullPool
from airflow import policies
from airflow.configuration import AIRFLOW_HOME, WEBSERVER_CONFIG, conf #
noqa: F401
-from airflow.exceptions import RemovedInAirflow3Warning
+from airflow.exceptions import AirflowInternalRuntimeError,
RemovedInAirflow3Warning
from airflow.executors import executor_constants
from airflow.logging_config import configure_logging
from airflow.utils.orm_event_handlers import setup_event_handlers
@@ -210,7 +210,7 @@ class SkipDBTestsSession:
"""This fake session is used to skip DB tests when
`_AIRFLOW_SKIP_DB_TESTS` is set."""
def __init__(self):
- raise RuntimeError(
+ raise AirflowInternalRuntimeError(
"Your test accessed the DB but `_AIRFLOW_SKIP_DB_TESTS` is set.\n"
"Either make sure your test does not use database or mark the test
with `@pytest.mark.db_test`\n"
"See
https://github.com/apache/airflow/blob/main/contributing-docs/testing/unit_tests.rst#"
diff --git a/tests/models/test_dagrun.py b/tests/models/test_dagrun.py
index 0a57dd7461..63865a26da 100644
--- a/tests/models/test_dagrun.py
+++ b/tests/models/test_dagrun.py
@@ -18,6 +18,7 @@
from __future__ import annotations
import datetime
+import warnings
from functools import reduce
from typing import TYPE_CHECKING, Mapping
from unittest import mock
@@ -29,10 +30,9 @@ import pytest
from airflow import settings
from airflow.callbacks.callback_requests import DagCallbackRequest
from airflow.decorators import setup, task, task_group, teardown
-from airflow.exceptions import AirflowException
+from airflow.exceptions import AirflowException, RemovedInAirflow3Warning
from airflow.models.baseoperator import BaseOperator
from airflow.models.dag import DAG, DagModel
-from airflow.models.dagbag import DagBag
from airflow.models.dagrun import DagRun, DagRunNote
from airflow.models.taskinstance import TaskInstance, TaskInstanceNote,
clear_task_instances
from airflow.models.taskmap import TaskMap
@@ -60,11 +60,28 @@ TI = TaskInstance
DEFAULT_DATE = pendulum.instance(_DEFAULT_DATE)
[email protected](scope="module")
+def dagbag():
+ from airflow.models.dagbag import DagBag
+
+ with warnings.catch_warnings():
+ # Some dags use deprecated operators, e.g SubDagOperator
+ # if it is not imported, then it might have side effects for the other
tests
+ warnings.simplefilter("ignore", category=RemovedInAirflow3Warning)
+ # Ensure the DAGs we are looking at from the DB are up-to-date
+ dag_bag = DagBag(include_examples=True)
+ return dag_bag
+
+
class TestDagRun:
- dagbag = DagBag(include_examples=True)
+ @pytest.fixture(autouse=True)
+ def setup_test_cases(self):
+ self._clean_db()
+ yield
+ self._clean_db()
@staticmethod
- def clean_db():
+ def _clean_db():
db.clear_db_runs()
db.clear_db_pools()
db.clear_db_dags()
@@ -73,12 +90,6 @@ class TestDagRun:
db.clear_db_xcom()
db.clear_db_task_fail()
- def setup_class(self) -> None:
- self.clean_db()
-
- def teardown_method(self) -> None:
- self.clean_db()
-
def create_dag_run(
self,
dag: DAG,
@@ -741,10 +752,10 @@ class TestDagRun:
(None, False),
],
)
- def test_depends_on_past(self, session, prev_ti_state, is_ti_success):
+ def test_depends_on_past(self, dagbag, session, prev_ti_state,
is_ti_success):
dag_id = "test_depends_on_past"
- dag = self.dagbag.get_dag(dag_id)
+ dag = dagbag.get_dag(dag_id)
task = dag.tasks[0]
dag_run_1 = self.create_dag_run(
@@ -778,9 +789,9 @@ class TestDagRun:
(None, False),
],
)
- def test_wait_for_downstream(self, session, prev_ti_state, is_ti_success):
+ def test_wait_for_downstream(self, dagbag, session, prev_ti_state,
is_ti_success):
dag_id = "test_wait_for_downstream"
- dag = self.dagbag.get_dag(dag_id)
+ dag = dagbag.get_dag(dag_id)
upstream, downstream = dag.tasks
# For ti.set_state() to work, the DagRun has to exist,
diff --git a/tests/providers/amazon/aws/auth_manager/avp/test_facade.py
b/tests/providers/amazon/aws/auth_manager/avp/test_facade.py
index 1e74168278..0daae8811e 100644
--- a/tests/providers/amazon/aws/auth_manager/avp/test_facade.py
+++ b/tests/providers/amazon/aws/auth_manager/avp/test_facade.py
@@ -44,6 +44,7 @@ test_user_no_group =
AwsAuthManagerUser(user_id="test_user_no_group", groups=[])
def facade():
with conf_vars(
{
+ ("aws_auth_manager", "conn_id"): "aws_default",
("aws_auth_manager", "region_name"): REGION_NAME,
("aws_auth_manager", "avp_policy_store_id"): AVP_POLICY_STORE_ID,
}
diff --git a/tests/providers/amazon/aws/fs/test_s3.py
b/tests/providers/amazon/aws/fs/test_s3.py
index ab9c5c1092..962f26060a 100644
--- a/tests/providers/amazon/aws/fs/test_s3.py
+++ b/tests/providers/amazon/aws/fs/test_s3.py
@@ -35,6 +35,13 @@ TEST_HEADER_VALUE = "payload"
TEST_REQ_URI = "s3://bucket/key"
[email protected](scope="module", autouse=True)
+def _setup_connections():
+ with pytest.MonkeyPatch.context() as mp_ctx:
+ mp_ctx.setenv(f"AIRFLOW_CONN_{TEST_CONN}".upper(), "aws://")
+ yield
+
+
class TestFilesystem:
def test_get_s3fs(self):
import s3fs
diff --git a/tests/providers/amazon/aws/hooks/test_base_aws.py
b/tests/providers/amazon/aws/hooks/test_base_aws.py
index a55432b6ef..49eb5c4058 100644
--- a/tests/providers/amazon/aws/hooks/test_base_aws.py
+++ b/tests/providers/amazon/aws/hooks/test_base_aws.py
@@ -427,6 +427,7 @@ class TestAwsBaseHook:
assert mock_class_name.call_count == len(found_classes)
assert user_agent_tags["Caller"] == found_classes[-1]
+ @pytest.mark.db_test
@mock.patch.object(AwsEcsExecutor, "_load_run_kwargs")
def test_user_agent_caller_target_executor_found(self,
mock_load_run_kwargs):
with conf_vars(
@@ -449,6 +450,7 @@ class TestAwsBaseHook:
assert user_agent_tags["Caller"] == default_caller_name
+ @pytest.mark.db_test
@pytest.mark.parametrize("env_var, expected_version",
[({"AIRFLOW_CTX_DAG_ID": "banana"}, 5), [{}, None]])
@mock.patch.object(AwsBaseHook, "_get_caller", return_value="Test")
def test_user_agent_dag_run_key_is_hashed_correctly(self, _, env_var,
expected_version):
diff --git a/tests/providers/amazon/aws/hooks/test_emr.py
b/tests/providers/amazon/aws/hooks/test_emr.py
index 0a58d372b1..aaa572cc5e 100644
--- a/tests/providers/amazon/aws/hooks/test_emr.py
+++ b/tests/providers/amazon/aws/hooks/test_emr.py
@@ -175,7 +175,9 @@ class TestEmrHook:
if sys.version_info >= (3, 12):
# Botocore generates deprecation warning on Python 3.12
connected with utcnow use
warnings.filterwarnings("ignore",
message=r".*datetime.utcnow.*", category=DeprecationWarning)
- cluster = hook.create_job_flow({"Name": "test_cluster",
"ReleaseLabel": "", "AmiVersion": "3.2"})
+ cluster = hook.create_job_flow(
+ {"Name": "test_cluster", "ReleaseLabel": "", "AmiVersion":
"3.2", "Instances": {}}
+ )
cluster =
client.describe_cluster(ClusterId=cluster["JobFlowId"])["Cluster"]
# The AmiVersion comes back as {Requested,Running}AmiVersion fields.
@@ -192,6 +194,7 @@ class TestEmrHook:
hook.create_job_flow(job_flow_overrides)
mock_run_job_flow.assert_called_once_with(**job_flow_overrides)
+ @pytest.mark.db_test
@mock.patch("airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook.get_conn")
def test_missing_emr_conn_id(self, mock_boto3_client):
"""Test not exists ``emr_conn_id``."""
diff --git a/tests/providers/amazon/aws/operators/test_ecs.py
b/tests/providers/amazon/aws/operators/test_ecs.py
index 136e776b7d..554ccad514 100644
--- a/tests/providers/amazon/aws/operators/test_ecs.py
+++ b/tests/providers/amazon/aws/operators/test_ecs.py
@@ -99,6 +99,7 @@ class EcsBaseTestCase:
def setup_test_cases(self, monkeypatch):
self.client = boto3.client("ecs", region_name="eu-west-3")
monkeypatch.setattr(EcsHook, "conn", self.client)
+ monkeypatch.setenv("AIRFLOW_CONN_AWS_TEST_CONN", '{"conn_type":
"aws"}')
class TestEcsBaseOperator(EcsBaseTestCase):
diff --git a/tests/providers/amazon/conftest.py
b/tests/providers/amazon/conftest.py
index 4ea7f429bc..ca31f1b9ff 100644
--- a/tests/providers/amazon/conftest.py
+++ b/tests/providers/amazon/conftest.py
@@ -102,3 +102,11 @@ def set_default_aws_settings(aws_testing_env_vars,
monkeypatch):
monkeypatch.delenv(env_name, raising=False)
for env_name, value in aws_testing_env_vars.items():
monkeypatch.setenv(env_name, value)
+
+
[email protected](scope="package", autouse=True)
+def setup_default_aws_connections():
+ with pytest.MonkeyPatch.context() as mp_ctx:
+ mp_ctx.setenv("AIRFLOW_CONN_AWS_DEFAULT", '{"conn_type": "aws"}')
+ mp_ctx.setenv("AIRFLOW_CONN_EMR_DEFAULT", '{"conn_type": "emr",
"extra": {}}')
+ yield
diff --git a/tests/providers/apache/hive/transfers/test_s3_to_hive.py
b/tests/providers/apache/hive/transfers/test_s3_to_hive.py
index 3e6248f271..5f738f4b54 100644
--- a/tests/providers/apache/hive/transfers/test_s3_to_hive.py
+++ b/tests/providers/apache/hive/transfers/test_s3_to_hive.py
@@ -37,6 +37,7 @@ moto = pytest.importorskip("moto")
logger = logging.getLogger(__name__)
[email protected]_test
class TestS3ToHiveTransfer:
@pytest.fixture(autouse=True)
def setup_attrs(self):
diff --git a/tests/providers/cncf/kubernetes/operators/test_job.py
b/tests/providers/cncf/kubernetes/operators/test_job.py
index a21dbeebd6..d776da2482 100644
--- a/tests/providers/cncf/kubernetes/operators/test_job.py
+++ b/tests/providers/cncf/kubernetes/operators/test_job.py
@@ -78,6 +78,7 @@ def create_context(task, persist_to_db=False, map_index=None):
}
[email protected]_test
@pytest.mark.execution_timeout(300)
class TestKubernetesJobOperator:
@pytest.fixture(autouse=True)
@@ -89,7 +90,6 @@ class TestKubernetesJobOperator:
patch.stopall()
- @pytest.mark.db_test
def test_templates(self, create_task_instance_of_operator):
dag_id = "TestKubernetesJobOperator"
ti = create_task_instance_of_operator(
@@ -465,6 +465,7 @@ class TestKubernetesJobOperator:
job = k.build_job_request_obj({})
assert re.match(r"job-a-very-reasonable-task-name-[a-z0-9-]+",
job.metadata.name) is not None
+ @pytest.mark.non_db_test_override
@patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.build_job_request_obj"))
@patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.create_job"))
@patch(HOOK_CLASS)
@@ -496,6 +497,7 @@ class TestKubernetesJobOperator:
assert execute_result is None
assert not mock_hook.wait_until_job_complete.called
+ @pytest.mark.non_db_test_override
@patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.build_job_request_obj"))
@patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.create_job"))
@patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.execute_deferrable"))
@@ -531,6 +533,7 @@ class TestKubernetesJobOperator:
assert actual_result is None
assert not mock_hook.wait_until_job_complete.called
+ @pytest.mark.non_db_test_override
@patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.build_job_request_obj"))
@patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.create_job"))
@patch(HOOK_CLASS)
@@ -545,6 +548,7 @@ class TestKubernetesJobOperator:
with pytest.raises(AirflowException):
op.execute(context=dict(ti=mock.MagicMock()))
+ @pytest.mark.non_db_test_override
@patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.defer"))
@patch(JOB_OPERATORS_PATH.format("KubernetesJobTrigger"))
def test_execute_deferrable(self, mock_trigger, mock_execute_deferrable):
@@ -587,6 +591,7 @@ class TestKubernetesJobOperator:
)
assert actual_result is None
+ @pytest.mark.non_db_test_override
@patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.build_job_request_obj"))
@patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.create_job"))
@patch(f"{HOOK_CLASS}.wait_until_job_complete")
@@ -609,6 +614,7 @@ class TestKubernetesJobOperator:
job_poll_interval=POLL_INTERVAL,
)
+ @pytest.mark.non_db_test_override
def test_execute_complete(self):
mock_ti = mock.MagicMock()
context = {"ti": mock_ti}
@@ -619,6 +625,7 @@ class TestKubernetesJobOperator:
mock_ti.xcom_push.assert_called_once_with(key="job", value=mock_job)
+ @pytest.mark.non_db_test_override
def test_execute_complete_fail(self):
mock_ti = mock.MagicMock()
context = {"ti": mock_ti}
@@ -630,6 +637,7 @@ class TestKubernetesJobOperator:
mock_ti.xcom_push.assert_called_once_with(key="job", value=mock_job)
+ @pytest.mark.non_db_test_override
@patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.client"))
@patch(HOOK_CLASS)
def test_on_kill(self, mock_hook, mock_client):
@@ -650,6 +658,7 @@ class TestKubernetesJobOperator:
)
mock_serialize.assert_called_once_with(mock_job)
+ @pytest.mark.non_db_test_override
@patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.client"))
@patch(HOOK_CLASS)
def test_on_kill_termination_grace_period(self, mock_hook, mock_client):
@@ -674,6 +683,7 @@ class TestKubernetesJobOperator:
)
mock_serialize.assert_called_once_with(mock_job)
+ @pytest.mark.non_db_test_override
@patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.client"))
@patch(HOOK_CLASS)
def test_on_kill_none_job(self, mock_hook, mock_client):
@@ -686,6 +696,7 @@ class TestKubernetesJobOperator:
mock_serialize.assert_not_called()
[email protected]_test
@pytest.mark.execution_timeout(300)
class TestKubernetesDeleteJobOperator:
@pytest.fixture(autouse=True)
@@ -824,6 +835,7 @@ class TestKubernetesPatchJobOperator:
patch.stopall()
+ @pytest.mark.db_test
@patch("kubernetes.config.load_kube_config")
@patch("kubernetes.client.api.BatchV1Api.patch_namespaced_job")
def test_update_execute(self, mock_patch_namespaced_job,
mock_load_kube_config):
diff --git a/tests/providers/cncf/kubernetes/operators/test_resource.py
b/tests/providers/cncf/kubernetes/operators/test_resource.py
index 4a32d51d0e..d2cef33d84 100644
--- a/tests/providers/cncf/kubernetes/operators/test_resource.py
+++ b/tests/providers/cncf/kubernetes/operators/test_resource.py
@@ -76,6 +76,7 @@ metadata:
HOOK_CLASS =
"airflow.providers.cncf.kubernetes.hooks.kubernetes.KubernetesHook"
[email protected]_test
@patch("airflow.utils.context.Context")
class TestKubernetesXResourceOperator:
@pytest.fixture(autouse=True)
diff --git a/tests/providers/google/cloud/operators/test_datapipeline.py
b/tests/providers/google/cloud/operators/test_datapipeline.py
index 6f0af0a848..2a948edf78 100644
--- a/tests/providers/google/cloud/operators/test_datapipeline.py
+++ b/tests/providers/google/cloud/operators/test_datapipeline.py
@@ -82,6 +82,7 @@ class TestCreateDataPipelineOperator:
project_id=TEST_PROJECTID, body=TEST_BODY, location=TEST_LOCATION
)
+ @pytest.mark.db_test
def test_body_invalid(self):
"""
Test that if the operator is not passed a Request Body, an
AirflowException is raised
@@ -124,6 +125,7 @@ class TestCreateDataPipelineOperator:
with pytest.raises(AirflowException):
CreateDataPipelineOperator(**init_kwargs).execute(mock.MagicMock())
+ @pytest.mark.db_test
def test_response_invalid(self):
"""
Test that if the Response Body contains an error message, an
AirflowException is raised
@@ -139,6 +141,7 @@ class TestCreateDataPipelineOperator:
CreateDataPipelineOperator(**init_kwargs).execute(mock.MagicMock())
[email protected]_test
class TestRunDataPipelineOperator:
@pytest.fixture
def run_operator(self):
diff --git a/tests/providers/google/cloud/operators/test_kubernetes_engine.py
b/tests/providers/google/cloud/operators/test_kubernetes_engine.py
index 4ed8076ce0..442cdde68e 100644
--- a/tests/providers/google/cloud/operators/test_kubernetes_engine.py
+++ b/tests/providers/google/cloud/operators/test_kubernetes_engine.py
@@ -493,6 +493,7 @@ class TestGKEStartKueueInsideClusterOperator:
self.gke_op._cluster_url = CLUSTER_URL
self.gke_op._ssl_ca_cert = SSL_CA_CERT
+ @pytest.mark.db_test
@mock.patch.dict(os.environ, {})
@mock.patch(TEMP_FILE)
@mock.patch(f"{GKE_CLUSTER_AUTH_DETAILS_PATH}.fetch_cluster_info")
diff --git a/tests/providers/google/cloud/operators/test_vertex_ai.py
b/tests/providers/google/cloud/operators/test_vertex_ai.py
index 2e0cee39b9..6004a1bd43 100644
--- a/tests/providers/google/cloud/operators/test_vertex_ai.py
+++ b/tests/providers/google/cloud/operators/test_vertex_ai.py
@@ -1568,6 +1568,7 @@ class TestVertexAICreateHyperparameterTuningJobOperator:
op.execute(context={"ti": mock.MagicMock()})
mock_defer.assert_called_once()
+ @pytest.mark.db_test
def test_deferrable_sync_error(self):
op = CreateHyperparameterTuningJobOperator(
task_id=TASK_ID,
diff --git a/tests/providers/mysql/hooks/test_mysql.py
b/tests/providers/mysql/hooks/test_mysql.py
index 271e249193..e6e8bd6ca5 100644
--- a/tests/providers/mysql/hooks/test_mysql.py
+++ b/tests/providers/mysql/hooks/test_mysql.py
@@ -168,8 +168,9 @@ class TestMySqlHookConn:
@mock.patch("MySQLdb.connect")
@mock.patch("airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook.get_client_type")
- def test_get_conn_rds_iam(self, mock_client, mock_connect):
- self.connection.extra = '{"iam":true}'
+ def test_get_conn_rds_iam(self, mock_client, mock_connect, monkeypatch):
+ monkeypatch.setenv("AIRFLOW_CONN_TEST_AWS_IAM_CONN", '{"conn_type":
"aws"}')
+ self.connection.extra = '{"iam":true, "aws_conn_id":
"test_aws_iam_conn"}'
mock_client.return_value.generate_db_auth_token.return_value =
"aws_token"
self.db_hook.get_conn()
mock_connect.assert_called_once_with(
diff --git a/tests/providers/samba/hooks/test_samba.py
b/tests/providers/samba/hooks/test_samba.py
index f6be5b52cd..f1d8fbbfb0 100644
--- a/tests/providers/samba/hooks/test_samba.py
+++ b/tests/providers/samba/hooks/test_samba.py
@@ -22,7 +22,7 @@ from unittest import mock
import pytest
-from airflow.exceptions import AirflowException
+from airflow.exceptions import AirflowNotFoundException
from airflow.models import Connection
from airflow.providers.samba.hooks.samba import SambaHook
@@ -37,9 +37,10 @@ CONNECTION = Connection(
class TestSambaHook:
+ @pytest.mark.db_test
def test_get_conn_should_fail_if_conn_id_does_not_exist(self):
- with pytest.raises(AirflowException):
- SambaHook("conn")
+ with pytest.raises(AirflowNotFoundException):
+ SambaHook("non-existed-connection-id")
@mock.patch("smbclient.register_session")
@mock.patch("airflow.hooks.base.BaseHook.get_connection")
diff --git a/tests/providers/slack/hooks/test_slack_webhook.py
b/tests/providers/slack/hooks/test_slack_webhook.py
index 0af09e3068..cf2d406533 100644
--- a/tests/providers/slack/hooks/test_slack_webhook.py
+++ b/tests/providers/slack/hooks/test_slack_webhook.py
@@ -123,6 +123,14 @@ def slack_webhook_connections():
schema="http",
host="some.netloc",
),
+ # Not supported anymore
+ Connection(conn_id="conn_token_in_host_1", conn_type=CONN_TYPE,
host=TEST_WEBHOOK_URL),
+ Connection(
+ conn_id="conn_token_in_host_2",
+ conn_type=CONN_TYPE,
+ schema="https",
+
host="hooks.slack.com/services/T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX",
+ ),
]
with pytest.MonkeyPatch.context() as mp:
for conn in connections:
@@ -196,9 +204,9 @@ class TestSlackWebhookHook:
@pytest.mark.parametrize("conn_id", ["conn_token_in_host_1",
"conn_token_in_host_2"])
def test_wrong_connections(self, conn_id):
- """Test previously valid connections, but now it is dropped."""
+ """Test previously valid connections, but now support of it is
dropped."""
hook = SlackWebhookHook(slack_webhook_conn_id=conn_id)
- with pytest.raises(AirflowNotFoundException):
+ with pytest.raises(AirflowNotFoundException, match="does not contain
password"):
hook._get_conn_params()
@pytest.mark.parametrize(
diff --git a/tests/serialization/test_dag_serialization.py
b/tests/serialization/test_dag_serialization.py
index 73d55e28da..992d237aac 100644
--- a/tests/serialization/test_dag_serialization.py
+++ b/tests/serialization/test_dag_serialization.py
@@ -384,10 +384,10 @@ def timetable_plugin(monkeypatch):
class TestStringifiedDAGs:
"""Unit tests for stringified DAGs."""
- def setup_method(self):
- self.backup_base_hook_get_connection = BaseHook.get_connection
- BaseHook.get_connection = mock.Mock(
- return_value=Connection(
+ @pytest.fixture(autouse=True)
+ def setup_test_cases(self):
+ with mock.patch.object(BaseHook, "get_connection") as m:
+ m.return_value = Connection(
extra=(
"{"
'"project_id": "mock", '
@@ -399,11 +399,6 @@ class TestStringifiedDAGs:
"}"
)
)
- )
- self.maxDiff = None
-
- def teardown_method(self):
- BaseHook.get_connection = self.backup_base_hook_get_connection
@pytest.mark.db_test
def test_serialization(self):
@@ -422,6 +417,7 @@ class TestStringifiedDAGs:
)
assert actual == expected
+ @pytest.mark.db_test
@pytest.mark.parametrize(
"timetable, serialized_timetable",
[
@@ -458,6 +454,7 @@ class TestStringifiedDAGs:
print(task["task_id"], k, v)
assert actual == expected
+ @pytest.mark.db_test
def test_dag_serialization_preserves_empty_access_roles(self):
"""Verify that an explicitly empty access_control dict is preserved."""
dag = collect_dags(["airflow/example_dags"])["simple_dag"]
@@ -467,6 +464,7 @@ class TestStringifiedDAGs:
assert serialized_dag["dag"]["_access_control"] == {"__type": "dict",
"__var": {}}
+ @pytest.mark.db_test
def test_dag_serialization_unregistered_custom_timetable(self):
"""Verify serialization fails without timetable registration."""
dag =
get_timetable_based_simple_dag(CustomSerializationTimetable("bar"))
@@ -506,6 +504,7 @@ class TestStringifiedDAGs:
expected = json.loads(json.dumps(sorted_serialized_dag(expected)))
return actual, expected
+ @pytest.mark.db_test
def test_deserialization_across_process(self):
"""A serialized DAG can be deserialized in another process."""
@@ -533,6 +532,7 @@ class TestStringifiedDAGs:
for dag_id in stringified_dags:
self.validate_deserialized_dag(stringified_dags[dag_id],
dags[dag_id])
+ @pytest.mark.db_test
def test_roundtrip_provider_example_dags(self):
dags = collect_dags(
[
@@ -546,6 +546,7 @@ class TestStringifiedDAGs:
serialized_dag =
SerializedDAG.from_json(SerializedDAG.to_json(dag))
self.validate_deserialized_dag(serialized_dag, dag)
+ @pytest.mark.db_test
@pytest.mark.parametrize(
"timetable",
[cron_timetable("0 0 * * *"), CustomSerializationTimetable("foo")],
diff --git a/tests/utils/test_cli_util.py b/tests/utils/test_cli_util.py
index 03056d3e20..9d431ace59 100644
--- a/tests/utils/test_cli_util.py
+++ b/tests/utils/test_cli_util.py
@@ -35,6 +35,10 @@ from airflow.models.log import Log
from airflow.utils import cli, cli_action_loggers, timezone
from airflow.utils.cli import _search_for_dag_file, get_dag_by_pickle
+# Mark entire module as db_test because ``action_cli`` wrapper still could use
DB on callbacks:
+# - ``cli_action_loggers.on_pre_execution``
+# - ``cli_action_loggers.on_post_execution``
+pytestmark = pytest.mark.db_test
repo_root = Path(airflow.__file__).parent.parent
diff --git a/tests/utils/test_email.py b/tests/utils/test_email.py
index f5b6805de3..2cdff5baa8 100644
--- a/tests/utils/test_email.py
+++ b/tests/utils/test_email.py
@@ -145,6 +145,7 @@ class TestEmail:
assert msg["To"] == ",".join(recipients)
[email protected]_test
class TestEmailSmtp:
@mock.patch("airflow.utils.email.send_mime_email")
def test_send_smtp(self, mock_send_mime, tmp_path):