This is an automated email from the ASF dual-hosted git repository.

taragolis pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 4358511fce Add `AirflowInternalRuntimeError` for raise "non catchable" 
errors (#38778)
4358511fce is described below

commit 4358511fce2eab6d3f97edb1c95d09f4f5a49376
Author: Andrey Anshin <[email protected]>
AuthorDate: Sun Apr 7 13:23:27 2024 +0400

    Add `AirflowInternalRuntimeError` for raise "non catchable" errors (#38778)
    
    * Add `AirflowInternalRuntimeError` for raise non catchable errors
    
    * Fixup non-db tests or move it as db tests
    
    * Fixup mysql test
---
 airflow/exceptions.py                              | 10 ++++++
 airflow/settings.py                                |  4 +--
 tests/models/test_dagrun.py                        | 39 ++++++++++++++--------
 .../amazon/aws/auth_manager/avp/test_facade.py     |  1 +
 tests/providers/amazon/aws/fs/test_s3.py           |  7 ++++
 tests/providers/amazon/aws/hooks/test_base_aws.py  |  2 ++
 tests/providers/amazon/aws/hooks/test_emr.py       |  5 ++-
 tests/providers/amazon/aws/operators/test_ecs.py   |  1 +
 tests/providers/amazon/conftest.py                 |  8 +++++
 .../apache/hive/transfers/test_s3_to_hive.py       |  1 +
 .../cncf/kubernetes/operators/test_job.py          | 14 +++++++-
 .../cncf/kubernetes/operators/test_resource.py     |  1 +
 .../google/cloud/operators/test_datapipeline.py    |  3 ++
 .../cloud/operators/test_kubernetes_engine.py      |  1 +
 .../google/cloud/operators/test_vertex_ai.py       |  1 +
 tests/providers/mysql/hooks/test_mysql.py          |  5 +--
 tests/providers/samba/hooks/test_samba.py          |  7 ++--
 tests/providers/slack/hooks/test_slack_webhook.py  | 12 +++++--
 tests/serialization/test_dag_serialization.py      | 19 ++++++-----
 tests/utils/test_cli_util.py                       |  4 +++
 tests/utils/test_email.py                          |  1 +
 21 files changed, 112 insertions(+), 34 deletions(-)

diff --git a/airflow/exceptions.py b/airflow/exceptions.py
index ea2f5b0140..c497367989 100644
--- a/airflow/exceptions.py
+++ b/airflow/exceptions.py
@@ -107,6 +107,16 @@ class 
AirflowOptionalProviderFeatureException(AirflowException):
     """Raise by providers when imports are missing for optional provider 
features."""
 
 
+class AirflowInternalRuntimeError(BaseException):
+    """
+    Airflow Internal runtime error.
+
+    Indicates that something really terrible happens during the Airflow 
execution.
+
+    :meta private:
+    """
+
+
 class XComNotFound(AirflowException):
     """Raise when an XCom reference is being resolved against a non-existent 
XCom."""
 
diff --git a/airflow/settings.py b/airflow/settings.py
index 2a2b1455bc..7ead89f34e 100644
--- a/airflow/settings.py
+++ b/airflow/settings.py
@@ -33,7 +33,7 @@ from sqlalchemy.pool import NullPool
 
 from airflow import policies
 from airflow.configuration import AIRFLOW_HOME, WEBSERVER_CONFIG, conf  # 
noqa: F401
-from airflow.exceptions import RemovedInAirflow3Warning
+from airflow.exceptions import AirflowInternalRuntimeError, 
RemovedInAirflow3Warning
 from airflow.executors import executor_constants
 from airflow.logging_config import configure_logging
 from airflow.utils.orm_event_handlers import setup_event_handlers
@@ -210,7 +210,7 @@ class SkipDBTestsSession:
     """This fake session is used to skip DB tests when 
`_AIRFLOW_SKIP_DB_TESTS` is set."""
 
     def __init__(self):
-        raise RuntimeError(
+        raise AirflowInternalRuntimeError(
             "Your test accessed the DB but `_AIRFLOW_SKIP_DB_TESTS` is set.\n"
             "Either make sure your test does not use database or mark the test 
with `@pytest.mark.db_test`\n"
             "See 
https://github.com/apache/airflow/blob/main/contributing-docs/testing/unit_tests.rst#";
diff --git a/tests/models/test_dagrun.py b/tests/models/test_dagrun.py
index 0a57dd7461..63865a26da 100644
--- a/tests/models/test_dagrun.py
+++ b/tests/models/test_dagrun.py
@@ -18,6 +18,7 @@
 from __future__ import annotations
 
 import datetime
+import warnings
 from functools import reduce
 from typing import TYPE_CHECKING, Mapping
 from unittest import mock
@@ -29,10 +30,9 @@ import pytest
 from airflow import settings
 from airflow.callbacks.callback_requests import DagCallbackRequest
 from airflow.decorators import setup, task, task_group, teardown
-from airflow.exceptions import AirflowException
+from airflow.exceptions import AirflowException, RemovedInAirflow3Warning
 from airflow.models.baseoperator import BaseOperator
 from airflow.models.dag import DAG, DagModel
-from airflow.models.dagbag import DagBag
 from airflow.models.dagrun import DagRun, DagRunNote
 from airflow.models.taskinstance import TaskInstance, TaskInstanceNote, 
clear_task_instances
 from airflow.models.taskmap import TaskMap
@@ -60,11 +60,28 @@ TI = TaskInstance
 DEFAULT_DATE = pendulum.instance(_DEFAULT_DATE)
 
 
[email protected](scope="module")
+def dagbag():
+    from airflow.models.dagbag import DagBag
+
+    with warnings.catch_warnings():
+        # Some dags use deprecated operators, e.g SubDagOperator
+        # if it is not imported, then it might have side effects for the other 
tests
+        warnings.simplefilter("ignore", category=RemovedInAirflow3Warning)
+        # Ensure the DAGs we are looking at from the DB are up-to-date
+        dag_bag = DagBag(include_examples=True)
+    return dag_bag
+
+
 class TestDagRun:
-    dagbag = DagBag(include_examples=True)
+    @pytest.fixture(autouse=True)
+    def setup_test_cases(self):
+        self._clean_db()
+        yield
+        self._clean_db()
 
     @staticmethod
-    def clean_db():
+    def _clean_db():
         db.clear_db_runs()
         db.clear_db_pools()
         db.clear_db_dags()
@@ -73,12 +90,6 @@ class TestDagRun:
         db.clear_db_xcom()
         db.clear_db_task_fail()
 
-    def setup_class(self) -> None:
-        self.clean_db()
-
-    def teardown_method(self) -> None:
-        self.clean_db()
-
     def create_dag_run(
         self,
         dag: DAG,
@@ -741,10 +752,10 @@ class TestDagRun:
             (None, False),
         ],
     )
-    def test_depends_on_past(self, session, prev_ti_state, is_ti_success):
+    def test_depends_on_past(self, dagbag, session, prev_ti_state, 
is_ti_success):
         dag_id = "test_depends_on_past"
 
-        dag = self.dagbag.get_dag(dag_id)
+        dag = dagbag.get_dag(dag_id)
         task = dag.tasks[0]
 
         dag_run_1 = self.create_dag_run(
@@ -778,9 +789,9 @@ class TestDagRun:
             (None, False),
         ],
     )
-    def test_wait_for_downstream(self, session, prev_ti_state, is_ti_success):
+    def test_wait_for_downstream(self, dagbag, session, prev_ti_state, 
is_ti_success):
         dag_id = "test_wait_for_downstream"
-        dag = self.dagbag.get_dag(dag_id)
+        dag = dagbag.get_dag(dag_id)
         upstream, downstream = dag.tasks
 
         # For ti.set_state() to work, the DagRun has to exist,
diff --git a/tests/providers/amazon/aws/auth_manager/avp/test_facade.py 
b/tests/providers/amazon/aws/auth_manager/avp/test_facade.py
index 1e74168278..0daae8811e 100644
--- a/tests/providers/amazon/aws/auth_manager/avp/test_facade.py
+++ b/tests/providers/amazon/aws/auth_manager/avp/test_facade.py
@@ -44,6 +44,7 @@ test_user_no_group = 
AwsAuthManagerUser(user_id="test_user_no_group", groups=[])
 def facade():
     with conf_vars(
         {
+            ("aws_auth_manager", "conn_id"): "aws_default",
             ("aws_auth_manager", "region_name"): REGION_NAME,
             ("aws_auth_manager", "avp_policy_store_id"): AVP_POLICY_STORE_ID,
         }
diff --git a/tests/providers/amazon/aws/fs/test_s3.py 
b/tests/providers/amazon/aws/fs/test_s3.py
index ab9c5c1092..962f26060a 100644
--- a/tests/providers/amazon/aws/fs/test_s3.py
+++ b/tests/providers/amazon/aws/fs/test_s3.py
@@ -35,6 +35,13 @@ TEST_HEADER_VALUE = "payload"
 TEST_REQ_URI = "s3://bucket/key"
 
 
[email protected](scope="module", autouse=True)
+def _setup_connections():
+    with pytest.MonkeyPatch.context() as mp_ctx:
+        mp_ctx.setenv(f"AIRFLOW_CONN_{TEST_CONN}".upper(), "aws://")
+        yield
+
+
 class TestFilesystem:
     def test_get_s3fs(self):
         import s3fs
diff --git a/tests/providers/amazon/aws/hooks/test_base_aws.py 
b/tests/providers/amazon/aws/hooks/test_base_aws.py
index a55432b6ef..49eb5c4058 100644
--- a/tests/providers/amazon/aws/hooks/test_base_aws.py
+++ b/tests/providers/amazon/aws/hooks/test_base_aws.py
@@ -427,6 +427,7 @@ class TestAwsBaseHook:
         assert mock_class_name.call_count == len(found_classes)
         assert user_agent_tags["Caller"] == found_classes[-1]
 
+    @pytest.mark.db_test
     @mock.patch.object(AwsEcsExecutor, "_load_run_kwargs")
     def test_user_agent_caller_target_executor_found(self, 
mock_load_run_kwargs):
         with conf_vars(
@@ -449,6 +450,7 @@ class TestAwsBaseHook:
 
         assert user_agent_tags["Caller"] == default_caller_name
 
+    @pytest.mark.db_test
     @pytest.mark.parametrize("env_var, expected_version", 
[({"AIRFLOW_CTX_DAG_ID": "banana"}, 5), [{}, None]])
     @mock.patch.object(AwsBaseHook, "_get_caller", return_value="Test")
     def test_user_agent_dag_run_key_is_hashed_correctly(self, _, env_var, 
expected_version):
diff --git a/tests/providers/amazon/aws/hooks/test_emr.py 
b/tests/providers/amazon/aws/hooks/test_emr.py
index 0a58d372b1..aaa572cc5e 100644
--- a/tests/providers/amazon/aws/hooks/test_emr.py
+++ b/tests/providers/amazon/aws/hooks/test_emr.py
@@ -175,7 +175,9 @@ class TestEmrHook:
             if sys.version_info >= (3, 12):
                 # Botocore generates deprecation warning on Python 3.12 
connected with utcnow use
                 warnings.filterwarnings("ignore", 
message=r".*datetime.utcnow.*", category=DeprecationWarning)
-            cluster = hook.create_job_flow({"Name": "test_cluster", 
"ReleaseLabel": "", "AmiVersion": "3.2"})
+            cluster = hook.create_job_flow(
+                {"Name": "test_cluster", "ReleaseLabel": "", "AmiVersion": 
"3.2", "Instances": {}}
+            )
         cluster = 
client.describe_cluster(ClusterId=cluster["JobFlowId"])["Cluster"]
 
         # The AmiVersion comes back as {Requested,Running}AmiVersion fields.
@@ -192,6 +194,7 @@ class TestEmrHook:
         hook.create_job_flow(job_flow_overrides)
         mock_run_job_flow.assert_called_once_with(**job_flow_overrides)
 
+    @pytest.mark.db_test
     
@mock.patch("airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook.get_conn")
     def test_missing_emr_conn_id(self, mock_boto3_client):
         """Test not exists ``emr_conn_id``."""
diff --git a/tests/providers/amazon/aws/operators/test_ecs.py 
b/tests/providers/amazon/aws/operators/test_ecs.py
index 136e776b7d..554ccad514 100644
--- a/tests/providers/amazon/aws/operators/test_ecs.py
+++ b/tests/providers/amazon/aws/operators/test_ecs.py
@@ -99,6 +99,7 @@ class EcsBaseTestCase:
     def setup_test_cases(self, monkeypatch):
         self.client = boto3.client("ecs", region_name="eu-west-3")
         monkeypatch.setattr(EcsHook, "conn", self.client)
+        monkeypatch.setenv("AIRFLOW_CONN_AWS_TEST_CONN", '{"conn_type": 
"aws"}')
 
 
 class TestEcsBaseOperator(EcsBaseTestCase):
diff --git a/tests/providers/amazon/conftest.py 
b/tests/providers/amazon/conftest.py
index 4ea7f429bc..ca31f1b9ff 100644
--- a/tests/providers/amazon/conftest.py
+++ b/tests/providers/amazon/conftest.py
@@ -102,3 +102,11 @@ def set_default_aws_settings(aws_testing_env_vars, 
monkeypatch):
             monkeypatch.delenv(env_name, raising=False)
     for env_name, value in aws_testing_env_vars.items():
         monkeypatch.setenv(env_name, value)
+
+
[email protected](scope="package", autouse=True)
+def setup_default_aws_connections():
+    with pytest.MonkeyPatch.context() as mp_ctx:
+        mp_ctx.setenv("AIRFLOW_CONN_AWS_DEFAULT", '{"conn_type": "aws"}')
+        mp_ctx.setenv("AIRFLOW_CONN_EMR_DEFAULT", '{"conn_type": "emr", 
"extra": {}}')
+        yield
diff --git a/tests/providers/apache/hive/transfers/test_s3_to_hive.py 
b/tests/providers/apache/hive/transfers/test_s3_to_hive.py
index 3e6248f271..5f738f4b54 100644
--- a/tests/providers/apache/hive/transfers/test_s3_to_hive.py
+++ b/tests/providers/apache/hive/transfers/test_s3_to_hive.py
@@ -37,6 +37,7 @@ moto = pytest.importorskip("moto")
 logger = logging.getLogger(__name__)
 
 
[email protected]_test
 class TestS3ToHiveTransfer:
     @pytest.fixture(autouse=True)
     def setup_attrs(self):
diff --git a/tests/providers/cncf/kubernetes/operators/test_job.py 
b/tests/providers/cncf/kubernetes/operators/test_job.py
index a21dbeebd6..d776da2482 100644
--- a/tests/providers/cncf/kubernetes/operators/test_job.py
+++ b/tests/providers/cncf/kubernetes/operators/test_job.py
@@ -78,6 +78,7 @@ def create_context(task, persist_to_db=False, map_index=None):
     }
 
 
[email protected]_test
 @pytest.mark.execution_timeout(300)
 class TestKubernetesJobOperator:
     @pytest.fixture(autouse=True)
@@ -89,7 +90,6 @@ class TestKubernetesJobOperator:
 
         patch.stopall()
 
-    @pytest.mark.db_test
     def test_templates(self, create_task_instance_of_operator):
         dag_id = "TestKubernetesJobOperator"
         ti = create_task_instance_of_operator(
@@ -465,6 +465,7 @@ class TestKubernetesJobOperator:
         job = k.build_job_request_obj({})
         assert re.match(r"job-a-very-reasonable-task-name-[a-z0-9-]+", 
job.metadata.name) is not None
 
+    @pytest.mark.non_db_test_override
     
@patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.build_job_request_obj"))
     @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.create_job"))
     @patch(HOOK_CLASS)
@@ -496,6 +497,7 @@ class TestKubernetesJobOperator:
         assert execute_result is None
         assert not mock_hook.wait_until_job_complete.called
 
+    @pytest.mark.non_db_test_override
     
@patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.build_job_request_obj"))
     @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.create_job"))
     
@patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.execute_deferrable"))
@@ -531,6 +533,7 @@ class TestKubernetesJobOperator:
         assert actual_result is None
         assert not mock_hook.wait_until_job_complete.called
 
+    @pytest.mark.non_db_test_override
     
@patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.build_job_request_obj"))
     @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.create_job"))
     @patch(HOOK_CLASS)
@@ -545,6 +548,7 @@ class TestKubernetesJobOperator:
         with pytest.raises(AirflowException):
             op.execute(context=dict(ti=mock.MagicMock()))
 
+    @pytest.mark.non_db_test_override
     @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.defer"))
     @patch(JOB_OPERATORS_PATH.format("KubernetesJobTrigger"))
     def test_execute_deferrable(self, mock_trigger, mock_execute_deferrable):
@@ -587,6 +591,7 @@ class TestKubernetesJobOperator:
         )
         assert actual_result is None
 
+    @pytest.mark.non_db_test_override
     
@patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.build_job_request_obj"))
     @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.create_job"))
     @patch(f"{HOOK_CLASS}.wait_until_job_complete")
@@ -609,6 +614,7 @@ class TestKubernetesJobOperator:
             job_poll_interval=POLL_INTERVAL,
         )
 
+    @pytest.mark.non_db_test_override
     def test_execute_complete(self):
         mock_ti = mock.MagicMock()
         context = {"ti": mock_ti}
@@ -619,6 +625,7 @@ class TestKubernetesJobOperator:
 
         mock_ti.xcom_push.assert_called_once_with(key="job", value=mock_job)
 
+    @pytest.mark.non_db_test_override
     def test_execute_complete_fail(self):
         mock_ti = mock.MagicMock()
         context = {"ti": mock_ti}
@@ -630,6 +637,7 @@ class TestKubernetesJobOperator:
 
         mock_ti.xcom_push.assert_called_once_with(key="job", value=mock_job)
 
+    @pytest.mark.non_db_test_override
     @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.client"))
     @patch(HOOK_CLASS)
     def test_on_kill(self, mock_hook, mock_client):
@@ -650,6 +658,7 @@ class TestKubernetesJobOperator:
         )
         mock_serialize.assert_called_once_with(mock_job)
 
+    @pytest.mark.non_db_test_override
     @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.client"))
     @patch(HOOK_CLASS)
     def test_on_kill_termination_grace_period(self, mock_hook, mock_client):
@@ -674,6 +683,7 @@ class TestKubernetesJobOperator:
         )
         mock_serialize.assert_called_once_with(mock_job)
 
+    @pytest.mark.non_db_test_override
     @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.client"))
     @patch(HOOK_CLASS)
     def test_on_kill_none_job(self, mock_hook, mock_client):
@@ -686,6 +696,7 @@ class TestKubernetesJobOperator:
         mock_serialize.assert_not_called()
 
 
[email protected]_test
 @pytest.mark.execution_timeout(300)
 class TestKubernetesDeleteJobOperator:
     @pytest.fixture(autouse=True)
@@ -824,6 +835,7 @@ class TestKubernetesPatchJobOperator:
 
         patch.stopall()
 
+    @pytest.mark.db_test
     @patch("kubernetes.config.load_kube_config")
     @patch("kubernetes.client.api.BatchV1Api.patch_namespaced_job")
     def test_update_execute(self, mock_patch_namespaced_job, 
mock_load_kube_config):
diff --git a/tests/providers/cncf/kubernetes/operators/test_resource.py 
b/tests/providers/cncf/kubernetes/operators/test_resource.py
index 4a32d51d0e..d2cef33d84 100644
--- a/tests/providers/cncf/kubernetes/operators/test_resource.py
+++ b/tests/providers/cncf/kubernetes/operators/test_resource.py
@@ -76,6 +76,7 @@ metadata:
 HOOK_CLASS = 
"airflow.providers.cncf.kubernetes.hooks.kubernetes.KubernetesHook"
 
 
[email protected]_test
 @patch("airflow.utils.context.Context")
 class TestKubernetesXResourceOperator:
     @pytest.fixture(autouse=True)
diff --git a/tests/providers/google/cloud/operators/test_datapipeline.py 
b/tests/providers/google/cloud/operators/test_datapipeline.py
index 6f0af0a848..2a948edf78 100644
--- a/tests/providers/google/cloud/operators/test_datapipeline.py
+++ b/tests/providers/google/cloud/operators/test_datapipeline.py
@@ -82,6 +82,7 @@ class TestCreateDataPipelineOperator:
             project_id=TEST_PROJECTID, body=TEST_BODY, location=TEST_LOCATION
         )
 
+    @pytest.mark.db_test
     def test_body_invalid(self):
         """
         Test that if the operator is not passed a Request Body, an 
AirflowException is raised
@@ -124,6 +125,7 @@ class TestCreateDataPipelineOperator:
         with pytest.raises(AirflowException):
             CreateDataPipelineOperator(**init_kwargs).execute(mock.MagicMock())
 
+    @pytest.mark.db_test
     def test_response_invalid(self):
         """
         Test that if the Response Body contains an error message, an 
AirflowException is raised
@@ -139,6 +141,7 @@ class TestCreateDataPipelineOperator:
             CreateDataPipelineOperator(**init_kwargs).execute(mock.MagicMock())
 
 
[email protected]_test
 class TestRunDataPipelineOperator:
     @pytest.fixture
     def run_operator(self):
diff --git a/tests/providers/google/cloud/operators/test_kubernetes_engine.py 
b/tests/providers/google/cloud/operators/test_kubernetes_engine.py
index 4ed8076ce0..442cdde68e 100644
--- a/tests/providers/google/cloud/operators/test_kubernetes_engine.py
+++ b/tests/providers/google/cloud/operators/test_kubernetes_engine.py
@@ -493,6 +493,7 @@ class TestGKEStartKueueInsideClusterOperator:
         self.gke_op._cluster_url = CLUSTER_URL
         self.gke_op._ssl_ca_cert = SSL_CA_CERT
 
+    @pytest.mark.db_test
     @mock.patch.dict(os.environ, {})
     @mock.patch(TEMP_FILE)
     @mock.patch(f"{GKE_CLUSTER_AUTH_DETAILS_PATH}.fetch_cluster_info")
diff --git a/tests/providers/google/cloud/operators/test_vertex_ai.py 
b/tests/providers/google/cloud/operators/test_vertex_ai.py
index 2e0cee39b9..6004a1bd43 100644
--- a/tests/providers/google/cloud/operators/test_vertex_ai.py
+++ b/tests/providers/google/cloud/operators/test_vertex_ai.py
@@ -1568,6 +1568,7 @@ class TestVertexAICreateHyperparameterTuningJobOperator:
         op.execute(context={"ti": mock.MagicMock()})
         mock_defer.assert_called_once()
 
+    @pytest.mark.db_test
     def test_deferrable_sync_error(self):
         op = CreateHyperparameterTuningJobOperator(
             task_id=TASK_ID,
diff --git a/tests/providers/mysql/hooks/test_mysql.py 
b/tests/providers/mysql/hooks/test_mysql.py
index 271e249193..e6e8bd6ca5 100644
--- a/tests/providers/mysql/hooks/test_mysql.py
+++ b/tests/providers/mysql/hooks/test_mysql.py
@@ -168,8 +168,9 @@ class TestMySqlHookConn:
 
     @mock.patch("MySQLdb.connect")
     
@mock.patch("airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook.get_client_type")
-    def test_get_conn_rds_iam(self, mock_client, mock_connect):
-        self.connection.extra = '{"iam":true}'
+    def test_get_conn_rds_iam(self, mock_client, mock_connect, monkeypatch):
+        monkeypatch.setenv("AIRFLOW_CONN_TEST_AWS_IAM_CONN", '{"conn_type": 
"aws"}')
+        self.connection.extra = '{"iam":true, "aws_conn_id": 
"test_aws_iam_conn"}'
         mock_client.return_value.generate_db_auth_token.return_value = 
"aws_token"
         self.db_hook.get_conn()
         mock_connect.assert_called_once_with(
diff --git a/tests/providers/samba/hooks/test_samba.py 
b/tests/providers/samba/hooks/test_samba.py
index f6be5b52cd..f1d8fbbfb0 100644
--- a/tests/providers/samba/hooks/test_samba.py
+++ b/tests/providers/samba/hooks/test_samba.py
@@ -22,7 +22,7 @@ from unittest import mock
 
 import pytest
 
-from airflow.exceptions import AirflowException
+from airflow.exceptions import AirflowNotFoundException
 from airflow.models import Connection
 from airflow.providers.samba.hooks.samba import SambaHook
 
@@ -37,9 +37,10 @@ CONNECTION = Connection(
 
 
 class TestSambaHook:
+    @pytest.mark.db_test
     def test_get_conn_should_fail_if_conn_id_does_not_exist(self):
-        with pytest.raises(AirflowException):
-            SambaHook("conn")
+        with pytest.raises(AirflowNotFoundException):
+            SambaHook("non-existed-connection-id")
 
     @mock.patch("smbclient.register_session")
     @mock.patch("airflow.hooks.base.BaseHook.get_connection")
diff --git a/tests/providers/slack/hooks/test_slack_webhook.py 
b/tests/providers/slack/hooks/test_slack_webhook.py
index 0af09e3068..cf2d406533 100644
--- a/tests/providers/slack/hooks/test_slack_webhook.py
+++ b/tests/providers/slack/hooks/test_slack_webhook.py
@@ -123,6 +123,14 @@ def slack_webhook_connections():
             schema="http",
             host="some.netloc",
         ),
+        # Not supported anymore
+        Connection(conn_id="conn_token_in_host_1", conn_type=CONN_TYPE, 
host=TEST_WEBHOOK_URL),
+        Connection(
+            conn_id="conn_token_in_host_2",
+            conn_type=CONN_TYPE,
+            schema="https",
+            
host="hooks.slack.com/services/T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX",
+        ),
     ]
     with pytest.MonkeyPatch.context() as mp:
         for conn in connections:
@@ -196,9 +204,9 @@ class TestSlackWebhookHook:
 
     @pytest.mark.parametrize("conn_id", ["conn_token_in_host_1", 
"conn_token_in_host_2"])
     def test_wrong_connections(self, conn_id):
-        """Test previously valid connections, but now it is dropped."""
+        """Test previously valid connections, but now support of it is 
dropped."""
         hook = SlackWebhookHook(slack_webhook_conn_id=conn_id)
-        with pytest.raises(AirflowNotFoundException):
+        with pytest.raises(AirflowNotFoundException, match="does not contain 
password"):
             hook._get_conn_params()
 
     @pytest.mark.parametrize(
diff --git a/tests/serialization/test_dag_serialization.py 
b/tests/serialization/test_dag_serialization.py
index 73d55e28da..992d237aac 100644
--- a/tests/serialization/test_dag_serialization.py
+++ b/tests/serialization/test_dag_serialization.py
@@ -384,10 +384,10 @@ def timetable_plugin(monkeypatch):
 class TestStringifiedDAGs:
     """Unit tests for stringified DAGs."""
 
-    def setup_method(self):
-        self.backup_base_hook_get_connection = BaseHook.get_connection
-        BaseHook.get_connection = mock.Mock(
-            return_value=Connection(
+    @pytest.fixture(autouse=True)
+    def setup_test_cases(self):
+        with mock.patch.object(BaseHook, "get_connection") as m:
+            m.return_value = Connection(
                 extra=(
                     "{"
                     '"project_id": "mock", '
@@ -399,11 +399,6 @@ class TestStringifiedDAGs:
                     "}"
                 )
             )
-        )
-        self.maxDiff = None
-
-    def teardown_method(self):
-        BaseHook.get_connection = self.backup_base_hook_get_connection
 
     @pytest.mark.db_test
     def test_serialization(self):
@@ -422,6 +417,7 @@ class TestStringifiedDAGs:
         )
         assert actual == expected
 
+    @pytest.mark.db_test
     @pytest.mark.parametrize(
         "timetable, serialized_timetable",
         [
@@ -458,6 +454,7 @@ class TestStringifiedDAGs:
                 print(task["task_id"], k, v)
         assert actual == expected
 
+    @pytest.mark.db_test
     def test_dag_serialization_preserves_empty_access_roles(self):
         """Verify that an explicitly empty access_control dict is preserved."""
         dag = collect_dags(["airflow/example_dags"])["simple_dag"]
@@ -467,6 +464,7 @@ class TestStringifiedDAGs:
 
         assert serialized_dag["dag"]["_access_control"] == {"__type": "dict", 
"__var": {}}
 
+    @pytest.mark.db_test
     def test_dag_serialization_unregistered_custom_timetable(self):
         """Verify serialization fails without timetable registration."""
         dag = 
get_timetable_based_simple_dag(CustomSerializationTimetable("bar"))
@@ -506,6 +504,7 @@ class TestStringifiedDAGs:
         expected = json.loads(json.dumps(sorted_serialized_dag(expected)))
         return actual, expected
 
+    @pytest.mark.db_test
     def test_deserialization_across_process(self):
         """A serialized DAG can be deserialized in another process."""
 
@@ -533,6 +532,7 @@ class TestStringifiedDAGs:
         for dag_id in stringified_dags:
             self.validate_deserialized_dag(stringified_dags[dag_id], 
dags[dag_id])
 
+    @pytest.mark.db_test
     def test_roundtrip_provider_example_dags(self):
         dags = collect_dags(
             [
@@ -546,6 +546,7 @@ class TestStringifiedDAGs:
             serialized_dag = 
SerializedDAG.from_json(SerializedDAG.to_json(dag))
             self.validate_deserialized_dag(serialized_dag, dag)
 
+    @pytest.mark.db_test
     @pytest.mark.parametrize(
         "timetable",
         [cron_timetable("0 0 * * *"), CustomSerializationTimetable("foo")],
diff --git a/tests/utils/test_cli_util.py b/tests/utils/test_cli_util.py
index 03056d3e20..9d431ace59 100644
--- a/tests/utils/test_cli_util.py
+++ b/tests/utils/test_cli_util.py
@@ -35,6 +35,10 @@ from airflow.models.log import Log
 from airflow.utils import cli, cli_action_loggers, timezone
 from airflow.utils.cli import _search_for_dag_file, get_dag_by_pickle
 
+# Mark entire module as db_test because ``action_cli`` wrapper still could use 
DB on callbacks:
+# - ``cli_action_loggers.on_pre_execution``
+# - ``cli_action_loggers.on_post_execution``
+pytestmark = pytest.mark.db_test
 repo_root = Path(airflow.__file__).parent.parent
 
 
diff --git a/tests/utils/test_email.py b/tests/utils/test_email.py
index f5b6805de3..2cdff5baa8 100644
--- a/tests/utils/test_email.py
+++ b/tests/utils/test_email.py
@@ -145,6 +145,7 @@ class TestEmail:
         assert msg["To"] == ",".join(recipients)
 
 
[email protected]_test
 class TestEmailSmtp:
     @mock.patch("airflow.utils.email.send_mime_email")
     def test_send_smtp(self, mock_send_mime, tmp_path):

Reply via email to