This is an automated email from the ASF dual-hosted git repository.
taragolis pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new a0c2071834 Partially enable PT012 rule (#38219)
a0c2071834 is described below
commit a0c2071834506f2342e9ac88c65003510a5efde8
Author: Andrey Anshin <[email protected]>
AuthorDate: Mon Mar 18 22:03:54 2024 +0400
Partially enable PT012 rule (#38219)
* Partially enable PT012 rule
* Fixup 'tests/www/test_app.py' tests
* Fixup
'tests/decorators/test_bash.py::TestBashDecorator::test_cwd_is_file' tests
---
pyproject.toml | 77 +++++++++++++++++++++++-
tests/cli/commands/test_task_command.py | 3 +-
tests/cli/test_cli_parser.py | 7 ++-
tests/core/test_stats.py | 13 ++--
tests/dag_processing/test_job_runner.py | 32 +++++-----
tests/decorators/test_bash.py | 48 ++++++++-------
tests/decorators/test_setup_teardown.py | 8 ++-
tests/jobs/test_backfill_job.py | 6 +-
tests/jobs/test_triggerer_job.py | 2 +-
tests/models/test_baseoperator.py | 49 +++++++--------
tests/models/test_dag.py | 16 +++--
tests/models/test_param.py | 4 +-
tests/models/test_taskinstance.py | 2 +-
tests/models/test_taskmixin.py | 1 -
tests/operators/test_python.py | 9 ++-
tests/operators/test_subdag_operator.py | 4 +-
tests/operators/test_trigger_dagrun.py | 7 +--
tests/security/test_kerberos.py | 10 ++--
tests/sensors/test_external_task_sensor.py | 32 +++++-----
tests/sensors/test_filesystem.py | 9 ++-
tests/serialization/test_serde.py | 6 +-
tests/utils/test_sqlalchemy.py | 4 +-
tests/utils/test_task_group.py | 42 +++++++------
tests/www/test_app.py | 96 +++++++++++-------------------
24 files changed, 263 insertions(+), 224 deletions(-)
diff --git a/pyproject.toml b/pyproject.toml
index 97aceac2af..6114cf93d5 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -1360,7 +1360,6 @@ ignore = [
"PT007", # Wrong type of values in @pytest.mark.parametrize
"PT008", # Use return_value= instead of patching with lambda
"PT011", # pytest.raises() is too broad, set the match parameter
- "PT012", # [controversial rule] pytest.raises() block should contain a
single simple statement.
"PT018", # assertion should be broken down into multiple parts
"PT019", # fixture without value is injected as parameter, use
@pytest.mark.usefixtures instead
]
@@ -1405,7 +1404,7 @@ combine-as-imports = true
"dev/breeze/tests/*" = ["TID253", "S101"]
"tests/*" = ["D", "TID253", "S101"]
"docker_tests/*" = ["D", "TID253", "S101"]
-"kubernetes_tests/*" = ["D", "TID253", "S101"]
+"kubernetes_tests/*" = ["D", "TID253", "S101", "PT012"]
"helm_tests/*" = ["D", "TID253", "S101"]
# All of the modules which have an extra license header (i.e. that we copy
from another project) need to
@@ -1417,7 +1416,7 @@ combine-as-imports = true
"tests/providers/elasticsearch/log/elasticmock/__init__.py" = ["E402"]
"tests/providers/elasticsearch/log/elasticmock/utilities/__init__.py" =
["E402"]
"tests/providers/openai/hooks/test_openai.py" = ["E402"]
-"tests/providers/openai/operators/test_openai.py" = ["E402"]
+"tests/providers/openai/operators/test_openai.py" = ["E402", "PT012"]
"tests/providers/qdrant/hooks/test_qdrant.py" = ["E402"]
"tests/providers/qdrant/operators/test_qdrant.py" = ["E402"]
"tests/providers/snowflake/operators/test_snowflake_sql.py" = ["E402"]
@@ -1488,6 +1487,78 @@ combine-as-imports = true
"airflow/providers/smtp/hooks/smtp.py" = ["D105"]
"airflow/providers/tableau/hooks/tableau.py" = ["D105"]
+# All the test modules which do not follow PT012 yet
+"tests/providers/amazon/aws/hooks/test_base_aws.py" = ["PT012"]
+"tests/providers/amazon/aws/hooks/test_datasync.py" = ["PT012"]
+"tests/providers/amazon/aws/hooks/test_eks.py" = ["PT012"]
+"tests/providers/amazon/aws/hooks/test_redshift_data.py" = ["PT012"]
+"tests/providers/amazon/aws/hooks/test_s3.py" = ["PT012"]
+"tests/providers/amazon/aws/operators/test_emr_serverless.py" = ["PT012"]
+"tests/providers/amazon/aws/operators/test_redshift_data.py" = ["PT012"]
+"tests/providers/amazon/aws/sensors/test_glacier.py" = ["PT012"]
+"tests/providers/amazon/aws/sensors/test_glue.py" = ["PT012"]
+"tests/providers/amazon/aws/sensors/test_lambda_function.py" = ["PT012"]
+"tests/providers/amazon/aws/system/utils/test_helpers.py" = ["PT012"]
+"tests/providers/amazon/aws/transfers/test_redshift_to_s3.py" = ["PT012"]
+"tests/providers/amazon/aws/triggers/test_ecs.py" = ["PT012"]
+"tests/providers/amazon/aws/waiters/test_neptune.py" = ["PT012"]
+"tests/providers/apache/beam/hooks/test_beam.py" = ["PT012"]
+"tests/providers/apache/hive/hooks/test_hive.py" = ["PT012"]
+"tests/providers/apache/hive/sensors/test_named_hive_partition.py" = ["PT012"]
+"tests/providers/apache/spark/hooks/test_spark_sql.py" = ["PT012"]
+"tests/providers/celery/sensors/test_celery_queue.py" = ["PT012"]
+"tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py" =
["PT012"]
+"tests/providers/cncf/kubernetes/hooks/test_kubernetes.py" = ["PT012"]
+"tests/providers/cncf/kubernetes/operators/test_pod.py" = ["PT012"]
+"tests/providers/cncf/kubernetes/utils/test_k8s_resource_iterator.py" =
["PT012"]
+"tests/providers/cncf/kubernetes/utils/test_pod_manager.py" = ["PT012"]
+"tests/providers/common/sql/operators/test_sql.py" = ["PT012"]
+"tests/providers/databricks/hooks/test_databricks.py" = ["PT012"]
+"tests/providers/databricks/operators/test_databricks.py" = ["PT012"]
+"tests/providers/databricks/operators/test_databricks_repos.py" = ["PT012"]
+"tests/providers/databricks/sensors/test_databricks_partition.py" = ["PT012"]
+"tests/providers/datadog/sensors/test_datadog.py" = ["PT012"]
+"tests/providers/dbt/cloud/operators/test_dbt.py" = ["PT012"]
+"tests/providers/fab/auth_manager/cli_commands/test_user_command.py" =
["PT012"]
+"tests/providers/ftp/operators/test_ftp.py" = ["PT012"]
+"tests/providers/google/cloud/hooks/test_bigquery.py" = ["PT012"]
+"tests/providers/google/cloud/hooks/test_cloud_storage_transfer_service.py" =
["PT012"]
+"tests/providers/google/cloud/hooks/test_dataflow.py" = ["PT012"]
+"tests/providers/google/cloud/hooks/test_dataprep.py" = ["PT012"]
+"tests/providers/google/cloud/hooks/test_kubernetes_engine.py" = ["PT012"]
+"tests/providers/google/cloud/hooks/test_pubsub.py" = ["PT012"]
+"tests/providers/google/cloud/operators/test_bigtable.py" = ["PT012"]
+"tests/providers/google/cloud/operators/test_cloud_sql.py" = ["PT012"]
+"tests/providers/google/cloud/operators/test_cloud_storage_transfer_service.py"
= ["PT012"]
+"tests/providers/google/cloud/operators/test_compute.py" = ["PT012"]
+"tests/providers/google/cloud/operators/test_datafusion.py" = ["PT012"]
+"tests/providers/google/cloud/operators/test_dataproc.py" = ["PT012"]
+"tests/providers/google/cloud/operators/test_functions.py" = ["PT012"]
+"tests/providers/google/cloud/operators/test_kubernetes_engine.py" = ["PT012"]
+"tests/providers/google/cloud/operators/test_mlengine.py" = ["PT012"]
+"tests/providers/google/cloud/operators/test_spanner.py" = ["PT012"]
+"tests/providers/google/cloud/sensors/test_datafusion.py" = ["PT012"]
+"tests/providers/google/cloud/sensors/test_dataproc.py" = ["PT012"]
+"tests/providers/google/cloud/sensors/test_gcs.py" = ["PT012"]
+"tests/providers/google/cloud/sensors/test_pubsub.py" = ["PT012"]
+"tests/providers/google/cloud/transfers/test_gcs_to_bigquery.py" = ["PT012"]
+"tests/providers/google/cloud/utils/test_credentials_provider.py" = ["PT012"]
+"tests/providers/google/common/hooks/test_base_google.py" = ["PT012"]
+"tests/providers/jenkins/sensors/test_jenkins.py" = ["PT012"]
+"tests/providers/microsoft/azure/hooks/test_data_factory.py" = ["PT012"]
+"tests/providers/microsoft/azure/hooks/test_wasb.py" = ["PT012"]
+"tests/providers/microsoft/psrp/hooks/test_psrp.py" = ["PT012"]
+"tests/providers/oracle/hooks/test_oracle.py" = ["PT012"]
+"tests/providers/papermill/operators/test_papermill.py" = ["PT012"]
+"tests/providers/sftp/hooks/test_sftp.py" = ["PT012"]
+"tests/providers/sftp/operators/test_sftp.py" = ["PT012"]
+"tests/providers/sftp/sensors/test_sftp.py" = ["PT012"]
+"tests/providers/sftp/triggers/test_sftp.py" = ["PT012"]
+"tests/providers/ssh/hooks/test_ssh.py" = ["PT012"]
+"tests/providers/ssh/operators/test_ssh.py" = ["PT012"]
+"tests/providers/telegram/hooks/test_telegram.py" = ["PT012"]
+"tests/providers/telegram/operators/test_telegram.py" = ["PT012"]
+
[tool.ruff.lint.flake8-tidy-imports]
# Ban certain modules from being imported at module level, instead requiring
# that they're imported lazily (e.g., within a function definition).
diff --git a/tests/cli/commands/test_task_command.py
b/tests/cli/commands/test_task_command.py
index 24c6c950d2..d345075213 100644
--- a/tests/cli/commands/test_task_command.py
+++ b/tests/cli/commands/test_task_command.py
@@ -619,14 +619,13 @@ class TestCliTasks:
task_states_for_dag_run should return an AirflowException when invalid
dag id is passed
"""
with pytest.raises(DagRunNotFound):
- default_date2 = timezone.datetime(2016, 1, 9)
task_command.task_states_for_dag_run(
self.parser.parse_args(
[
"tasks",
"states-for-dag-run",
"not_exists_dag",
- default_date2.isoformat(),
+ timezone.datetime(2016, 1, 9).isoformat(),
"--output",
"json",
]
diff --git a/tests/cli/test_cli_parser.py b/tests/cli/test_cli_parser.py
index f8c1d53951..d105b01614 100644
--- a/tests/cli/test_cli_parser.py
+++ b/tests/cli/test_cli_parser.py
@@ -155,9 +155,9 @@ class TestCli:
args=[],
)
]
+ reload(executor_loader)
with pytest.raises(CliConflictError, match="test_command"):
# force re-evaluation of cli commands (done in top level code)
- reload(executor_loader)
reload(cli_parser)
def test_falsy_default_value(self):
@@ -229,6 +229,7 @@ class TestCli:
with pytest.raises(argparse.ArgumentTypeError):
cli_config.positive_int(allow_zero=False)("0")
+ with pytest.raises(argparse.ArgumentTypeError):
cli_config.positive_int(allow_zero=True)("-1")
@pytest.mark.parametrize(
@@ -285,8 +286,8 @@ class TestCli:
def
test_non_existing_directory_raises_when_metavar_is_dir_for_db_export_cleaned(self):
"""Test that the error message is correct when the directory does not
exist."""
with contextlib.redirect_stderr(StringIO()) as stderr:
+ parser = cli_parser.get_parser()
with pytest.raises(SystemExit):
- parser = cli_parser.get_parser()
parser.parse_args(["db", "export-archived", "--output-path",
"/non/existing/directory"])
error_msg = stderr.getvalue()
@@ -302,8 +303,8 @@ class TestCli:
):
"""Test that invalid choice raises for export-format in db
export-cleaned command."""
with contextlib.redirect_stderr(StringIO()) as stderr:
+ parser = cli_parser.get_parser()
with pytest.raises(SystemExit):
- parser = cli_parser.get_parser()
parser.parse_args(
["db", "export-archived", "--export-format",
export_format, "--output-path", "mydir"]
)
diff --git a/tests/core/test_stats.py b/tests/core/test_stats.py
index ddd69ca32a..2c167d5d1e 100644
--- a/tests/core/test_stats.py
+++ b/tests/core/test_stats.py
@@ -124,15 +124,14 @@ class TestStats:
("metrics", "statsd_on"): "True",
("metrics", "statsd_custom_client_path"):
f"{__name__}.InvalidCustomStatsd",
}
- ), pytest.raises(
- AirflowConfigException,
- match=re.escape(
- "Your custom StatsD client must extend the statsd."
- "StatsClient in order to ensure backwards compatibility."
- ),
):
importlib.reload(airflow.stats)
- airflow.stats.Stats.incr("empty_key")
+ error_message = re.escape(
+ "Your custom StatsD client must extend the statsd."
+ "StatsClient in order to ensure backwards compatibility."
+ )
+ with pytest.raises(AirflowConfigException, match=error_message):
+ airflow.stats.Stats.incr("empty_key")
importlib.reload(airflow.stats)
def test_load_allow_list_validator(self):
diff --git a/tests/dag_processing/test_job_runner.py
b/tests/dag_processing/test_job_runner.py
index b7b24bc1b2..d77f7692d3 100644
--- a/tests/dag_processing/test_job_runner.py
+++ b/tests/dag_processing/test_job_runner.py
@@ -1461,17 +1461,17 @@ class TestDagFileProcessorAgent:
assert os.path.isfile(log_file_loc)
def test_single_parsing_loop_no_parent_signal_conn(self):
+ processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365),
[], False, False)
+ processor_agent._process = Mock()
+ processor_agent._parent_signal_conn = None
with pytest.raises(ValueError, match="Process not started"):
- processor_agent = DagFileProcessorAgent("", 1,
timedelta(days=365), [], False, False)
- processor_agent._process = Mock()
- processor_agent._parent_signal_conn = None
processor_agent.run_single_parsing_loop()
def test_single_parsing_loop_no_process(self):
+ processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365),
[], False, False)
+ processor_agent._parent_signal_conn = Mock()
+ processor_agent._process = None
with pytest.raises(ValueError, match="Process not started"):
- processor_agent = DagFileProcessorAgent("", 1,
timedelta(days=365), [], False, False)
- processor_agent._parent_signal_conn = Mock()
- processor_agent._process = None
processor_agent.run_single_parsing_loop()
def test_single_parsing_loop_process_isnt_alive(self):
@@ -1498,15 +1498,15 @@ class TestDagFileProcessorAgent:
assert retval == processor_agent._parent_signal_conn
def test_get_callbacks_pipe_no_parent_signal_conn(self):
+ processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365),
[], False, False)
+ processor_agent._parent_signal_conn = None
with pytest.raises(ValueError, match="Process not started"):
- processor_agent = DagFileProcessorAgent("", 1,
timedelta(days=365), [], False, False)
- processor_agent._parent_signal_conn = None
processor_agent.get_callbacks_pipe()
def test_wait_until_finished_no_parent_signal_conn(self):
+ processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365),
[], False, False)
+ processor_agent._parent_signal_conn = None
with pytest.raises(ValueError, match="Process not started"):
- processor_agent = DagFileProcessorAgent("", 1,
timedelta(days=365), [], False, False)
- processor_agent._parent_signal_conn = None
processor_agent.wait_until_finished()
def test_wait_until_finished_poll_eof_error(self):
@@ -1519,9 +1519,9 @@ class TestDagFileProcessorAgent:
assert ret_val is None
def test_heartbeat_no_parent_signal_conn(self):
+ processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365),
[], False, False)
+ processor_agent._parent_signal_conn = None
with pytest.raises(ValueError, match="Process not started"):
- processor_agent = DagFileProcessorAgent("", 1,
timedelta(days=365), [], False, False)
- processor_agent._parent_signal_conn = None
processor_agent.heartbeat()
def test_heartbeat_poll_eof_error(self):
@@ -1553,15 +1553,15 @@ class TestDagFileProcessorAgent:
processor_agent._process_message.assert_called_with("testelem")
def test_process_message_invalid_type(self):
+ message = "xyz"
+ processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365),
[], False, False)
with pytest.raises(RuntimeError, match="Unexpected message received of
type str"):
- message = "xyz"
- processor_agent = DagFileProcessorAgent("", 1,
timedelta(days=365), [], False, False)
processor_agent._process_message(message)
def test_heartbeat_manager(self):
+ processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365),
[], False, False)
+ processor_agent._parent_signal_conn = None
with pytest.raises(ValueError, match="Process not started"):
- processor_agent = DagFileProcessorAgent("", 1,
timedelta(days=365), [], False, False)
- processor_agent._parent_signal_conn = None
processor_agent._heartbeat_manager()
@mock.patch("airflow.utils.process_utils.reap_process_group")
diff --git a/tests/decorators/test_bash.py b/tests/decorators/test_bash.py
index 612e6a3327..1f2eb822d7 100644
--- a/tests/decorators/test_bash.py
+++ b/tests/decorators/test_bash.py
@@ -339,14 +339,15 @@ class TestBashDecorator:
assert bash_task.operator.bash_command == NOTSET
+ dr = self.dag_maker.create_dagrun()
+ ti = dr.task_instances[0]
with pytest.raises(AirflowException, match=f"Can not find the cwd:
{cwd_path}"):
- ti, _ = self.execute_task(bash_task)
-
- self.validate_bash_command_rtif(ti, "echo")
+ ti.run()
+ assert ti.task.bash_command == "echo"
def test_cwd_is_file(self, tmp_path):
"""Verify task failure for user-defined working directory that is
actually a file."""
- cwd_file = tmp_path / "test_file.sh"
+ cwd_file = tmp_path / "testfile.var.env"
cwd_file.touch()
with self.dag:
@@ -359,28 +360,32 @@ class TestBashDecorator:
assert bash_task.operator.bash_command == NOTSET
+ dr = self.dag_maker.create_dagrun()
+ ti = dr.task_instances[0]
with pytest.raises(AirflowException, match=f"The cwd {cwd_file} must
be a directory"):
- ti, _ = self.execute_task(bash_task)
-
- self.validate_bash_command_rtif(ti, "echo")
+ ti.run()
+ assert ti.task.bash_command == "echo"
def test_command_not_found(self):
"""Fail task if executed command is not found on path."""
- with pytest.raises(
- AirflowException, match="Bash command failed\\. The command
returned a non-zero exit code 127\\."
- ):
- with self.dag:
- @task.bash
- def bash():
- return "set -e; something-that-isnt-on-path"
+ with self.dag:
- bash_task = bash()
+ @task.bash
+ def bash():
+ return "set -e; something-that-isnt-on-path"
- assert bash_task.operator.bash_command == NOTSET
+ bash_task = bash()
- ti, _ = self.execute_task(bash_task)
- self.validate_bash_command_rtif(ti, "set -e;
something-that-isnt-on-path")
+ assert bash_task.operator.bash_command == NOTSET
+
+ dr = self.dag_maker.create_dagrun()
+ ti = dr.task_instances[0]
+ with pytest.raises(
+ AirflowException, match="Bash command failed\\. The command
returned a non-zero exit code 127\\."
+ ):
+ ti.run()
+ assert ti.task.bash_command == "set -e; something-that-isnt-on-path"
def test_multiple_outputs_true(self):
"""Verify setting `multiple_outputs` for a @task.bash-decorated
function is ignored."""
@@ -474,7 +479,8 @@ class TestBashDecorator:
assert bash_task.operator.bash_command == NOTSET
+ dr = self.dag_maker.create_dagrun()
+ ti = dr.task_instances[0]
with pytest.raises(AirflowException):
- ti, _ = self.execute_task(bash_task)
-
- self.validate_bash_command_rtif(ti, f"{DEFAULT_DATE.date()}; exit
1;")
+ ti.run()
+ assert ti.task.bash_command == f"{DEFAULT_DATE.date()}; exit 1;"
diff --git a/tests/decorators/test_setup_teardown.py
b/tests/decorators/test_setup_teardown.py
index 80694f47cb..13451ba379 100644
--- a/tests/decorators/test_setup_teardown.py
+++ b/tests/decorators/test_setup_teardown.py
@@ -126,7 +126,7 @@ class TestSetupTearDownTask:
def test_setup_taskgroup_decorator(self, dag_maker):
with dag_maker():
- with pytest.raises(
+ with pytest.raises( # noqa: PT012, check decorators required more
than one line
expected_exception=AirflowException,
match="Task groups cannot be marked as setup or teardown.",
):
@@ -144,7 +144,7 @@ class TestSetupTearDownTask:
def test_teardown_taskgroup_decorator(self, dag_maker):
with dag_maker():
- with pytest.raises(
+ with pytest.raises( # noqa: PT012, check decorators required more
than one line
expected_exception=AirflowException,
match="Task groups cannot be marked as setup or teardown.",
):
@@ -989,7 +989,9 @@ class TestSetupTearDownTask:
def teardowntask():
print("teardown")
- with pytest.raises(ValueError, match="All tasks in the list must be
either setup or teardown tasks"):
+ with pytest.raises( # noqa: PT012, check decorators required more
than one line
+ ValueError, match="All tasks in the list must be either setup or
teardown tasks"
+ ):
with dag_maker():
with setuptask() << context_wrapper([teardowntask(),
setuptask2()]):
mytask()
diff --git a/tests/jobs/test_backfill_job.py b/tests/jobs/test_backfill_job.py
index 18f6a47ce0..ad80166147 100644
--- a/tests/jobs/test_backfill_job.py
+++ b/tests/jobs/test_backfill_job.py
@@ -1046,10 +1046,10 @@ class TestBackfillJob:
# raises backwards
expected_msg = "You cannot backfill backwards because one or more
tasks depend_on_past: test_dop_task"
+ executor = MockExecutor()
+ job = Job(executor=executor)
+ job_runner = BackfillJobRunner(job=job, dag=dag, run_backwards=True,
**kwargs)
with pytest.raises(AirflowException, match=expected_msg):
- executor = MockExecutor()
- job = Job(executor=executor)
- job_runner = BackfillJobRunner(job=job, dag=dag,
run_backwards=True, **kwargs)
run_job(job=job, execute_callable=job_runner._execute)
def test_cli_receives_delay_arg(self):
diff --git a/tests/jobs/test_triggerer_job.py b/tests/jobs/test_triggerer_job.py
index 1cc69062e4..7ce4f94590 100644
--- a/tests/jobs/test_triggerer_job.py
+++ b/tests/jobs/test_triggerer_job.py
@@ -213,8 +213,8 @@ def test_capacity_decode():
4 / 2, # Resolves to a float, in addition to being just plain weird
]
for input_str in variants:
+ job = Job()
with pytest.raises(ValueError):
- job = Job()
TriggererJobRunner(job=job, capacity=input_str)
diff --git a/tests/models/test_baseoperator.py
b/tests/models/test_baseoperator.py
index 989e772074..8513d2ebb0 100644
--- a/tests/models/test_baseoperator.py
+++ b/tests/models/test_baseoperator.py
@@ -310,43 +310,38 @@ class TestBaseOperator:
assert result == expected_output
def test_mapped_dag_slas_disabled_classic(self):
- with pytest.raises(AirflowException, match="SLAs are unsupported with
mapped tasks"):
- with DAG(
- "test-dag", start_date=DEFAULT_DATE,
default_args=dict(sla=timedelta(minutes=30))
- ) as dag:
+ class MyOp(BaseOperator):
+ def __init__(self, x, **kwargs):
+ self.x = x
+ super().__init__(**kwargs)
- @dag.task
- def get_values():
- return [0, 1, 2]
+ def execute(self, context):
+ print(self.x)
- task1 = get_values()
+ with DAG("test-dag", start_date=DEFAULT_DATE,
default_args=dict(sla=timedelta(minutes=30))) as dag:
- class MyOp(BaseOperator):
- def __init__(self, x, **kwargs):
- self.x = x
- super().__init__(**kwargs)
-
- def execute(self, context):
- print(self.x)
+ @dag.task
+ def get_values():
+ return [0, 1, 2]
+ task1 = get_values()
+ with pytest.raises(AirflowException, match="SLAs are unsupported
with mapped tasks"):
MyOp.partial(task_id="hi").expand(x=task1)
def test_mapped_dag_slas_disabled_taskflow(self):
- with pytest.raises(AirflowException, match="SLAs are unsupported with
mapped tasks"):
- with DAG(
- "test-dag", start_date=DEFAULT_DATE,
default_args=dict(sla=timedelta(minutes=30))
- ) as dag:
+ with DAG("test-dag", start_date=DEFAULT_DATE,
default_args=dict(sla=timedelta(minutes=30))) as dag:
- @dag.task
- def get_values():
- return [0, 1, 2]
+ @dag.task
+ def get_values():
+ return [0, 1, 2]
- task1 = get_values()
+ task1 = get_values()
- @dag.task
- def print_val(x):
- print(x)
+ @dag.task
+ def print_val(x):
+ print(x)
+ with pytest.raises(AirflowException, match="SLAs are unsupported
with mapped tasks"):
print_val.expand(x=task1)
@pytest.mark.db_test
@@ -749,8 +744,8 @@ class TestBaseOperator:
assert op1 in op2.upstream_list
def test_set_xcomargs_dependencies_error_when_outside_dag(self):
+ op1 = BaseOperator(task_id="op1")
with pytest.raises(AirflowException):
- op1 = BaseOperator(task_id="op1")
MockOperator(task_id="op2", arg1=op1.output)
def test_invalid_trigger_rule(self):
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index 27a4be3ce6..ba60315212 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -1533,21 +1533,19 @@ class TestDag:
def test_duplicate_task_ids_not_allowed_with_dag_context_manager(self):
"""Verify tasks with Duplicate task_id raises error"""
- with pytest.raises(DuplicateTaskIdFound, match="Task id 't1' has
already been added to the DAG"):
- with DAG("test_dag", start_date=DEFAULT_DATE) as dag:
- op1 = EmptyOperator(task_id="t1")
- op2 = BashOperator(task_id="t1", bash_command="sleep 1")
- op1 >> op2
+ with DAG("test_dag", start_date=DEFAULT_DATE) as dag:
+ op1 = EmptyOperator(task_id="t1")
+ with pytest.raises(DuplicateTaskIdFound, match="Task id 't1' has
already been added to the DAG"):
+ BashOperator(task_id="t1", bash_command="sleep 1")
assert dag.task_dict == {op1.task_id: op1}
def test_duplicate_task_ids_not_allowed_without_dag_context_manager(self):
"""Verify tasks with Duplicate task_id raises error"""
+ dag = DAG("test_dag", start_date=DEFAULT_DATE)
+ op1 = EmptyOperator(task_id="t1", dag=dag)
with pytest.raises(DuplicateTaskIdFound, match="Task id 't1' has
already been added to the DAG"):
- dag = DAG("test_dag", start_date=DEFAULT_DATE)
- op1 = EmptyOperator(task_id="t1", dag=dag)
- op2 = EmptyOperator(task_id="t1", dag=dag)
- op1 >> op2
+ EmptyOperator(task_id="t1", dag=dag)
assert dag.task_dict == {op1.task_id: op1}
diff --git a/tests/models/test_param.py b/tests/models/test_param.py
index 24a910f01e..89421f0dc2 100644
--- a/tests/models/test_param.py
+++ b/tests/models/test_param.py
@@ -171,8 +171,8 @@ class TestParam:
p = Param(1.2, type="number")
assert p.resolve() == 1.2
+ p = Param("42", type="number")
with pytest.raises(ParamValidationError):
- p = Param("42", type="number")
p.resolve()
def test_list_param(self):
@@ -212,8 +212,8 @@ class TestParam:
p = S3Param("s3://my_bucket/my_path")
assert p.resolve() == "s3://my_bucket/my_path"
+ p = S3Param("file://not_valid/s3_path")
with pytest.raises(ParamValidationError):
- p = S3Param("file://not_valid/s3_path")
p.resolve()
def test_value_saved(self):
diff --git a/tests/models/test_taskinstance.py
b/tests/models/test_taskinstance.py
index 0d9d4df0f4..457d311923 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -402,8 +402,8 @@ class TestTaskInstance:
test that try to create a task with pool_slots less than 1
"""
+ dag = DAG(dag_id="test_run_pooling_task")
with pytest.raises(ValueError, match="pool slots .* cannot be less
than 1"):
- dag = DAG(dag_id="test_run_pooling_task")
EmptyOperator(
task_id="test_run_pooling_task_op",
dag=dag,
diff --git a/tests/models/test_taskmixin.py b/tests/models/test_taskmixin.py
index 070483cfef..903f215ce7 100644
--- a/tests/models/test_taskmixin.py
+++ b/tests/models/test_taskmixin.py
@@ -166,7 +166,6 @@ def test_cannot_be_both_setup_and_teardown(dag_maker,
type_):
ValueError, match=f"Cannot mark task 's1' as {second}; task is
already a {first}."
):
getattr(s1, f"as_{second}")()
- s1.as_teardown()
def test_cannot_set_on_failure_fail_dagrun_unless_teardown_classic(dag_maker):
diff --git a/tests/operators/test_python.py b/tests/operators/test_python.py
index a6bf98ee16..3e6f5a8b9a 100644
--- a/tests/operators/test_python.py
+++ b/tests/operators/test_python.py
@@ -757,9 +757,8 @@ class BaseTestPythonVirtualenvOperator(BasePythonTest):
def f():
raise Exception("Custom error message")
- with pytest.raises(AirflowException) as e:
+ with pytest.raises(AirflowException, match="Custom error message"):
self.run_as_task(f)
- assert "Custom error message" in str(e)
def test_string_args(self):
def f():
@@ -913,11 +912,11 @@ class
TestPythonVirtualenvOperator(BaseTestPythonVirtualenvOperator):
def test_virtualenv_not_installed(self, importlib_mock, which_mock):
which_mock.return_value = None
importlib_mock.util.find_spec.return_value = None
- with pytest.raises(AirflowException, match="requires virtualenv"):
- def f():
- pass
+ def f():
+ pass
+ with pytest.raises(AirflowException, match="requires virtualenv"):
self.run_as_task(f)
def test_add_dill(self):
diff --git a/tests/operators/test_subdag_operator.py
b/tests/operators/test_subdag_operator.py
index 88bb701966..dd5157e891 100644
--- a/tests/operators/test_subdag_operator.py
+++ b/tests/operators/test_subdag_operator.py
@@ -229,9 +229,9 @@ class TestSubDagOperator:
"execution_date": DEFAULT_DATE,
}
+ subdag_task.pre_execute(context=context)
+ subdag_task.execute(context=context)
with pytest.raises(AirflowException):
- subdag_task.pre_execute(context=context)
- subdag_task.execute(context=context)
subdag_task.post_execute(context=context)
def test_execute_skip_if_dagrun_success(self):
diff --git a/tests/operators/test_trigger_dagrun.py
b/tests/operators/test_trigger_dagrun.py
index bad16517ea..a90f49926e 100644
--- a/tests/operators/test_trigger_dagrun.py
+++ b/tests/operators/test_trigger_dagrun.py
@@ -493,12 +493,11 @@ class TestDagRunOperator:
poll_interval=20,
states=["success", "failed"],
)
- with pytest.raises(AirflowException) as exception:
+ with pytest.raises(AirflowException, match="which is not in"):
task.execute_complete(
context={},
event=trigger.serialize(),
)
- assert "which is not in" in str(exception)
def
test_trigger_dagrun_with_wait_for_completion_true_defer_true_failure_2(self):
"""Test TriggerDagRunOperator wait_for_completion dag run in failed
state."""
@@ -528,7 +527,5 @@ class TestDagRunOperator:
states=["success", "failed"],
)
- with pytest.raises(AirflowException) as exception:
+ with pytest.raises(AirflowException, match="failed with failed state"):
task.execute_complete(context={}, event=trigger.serialize())
-
- assert "failed with failed state" in str(exception)
diff --git a/tests/security/test_kerberos.py b/tests/security/test_kerberos.py
index 4989aeed8d..b424edb820 100644
--- a/tests/security/test_kerberos.py
+++ b/tests/security/test_kerberos.py
@@ -167,8 +167,8 @@ class TestKerberos:
mock_subp.stdout = mock.MagicMock(name="stdout",
**{"readlines.return_value": ["STDOUT"]})
mock_subp.stderr = mock.MagicMock(name="stderr",
**{"readlines.return_value": ["STDERR"]})
+ caplog.clear()
with pytest.raises(SystemExit) as ctx:
- caplog.clear()
renew_from_kt(principal="test-principal", keytab="keytab")
assert ctx.value.code == 1
@@ -216,8 +216,8 @@ class TestKerberos:
mock_subprocess.Popen.return_value.__enter__.return_value.returncode = 0
mock_subprocess.call.return_value = 1
+ caplog.clear()
with pytest.raises(SystemExit) as ctx:
- caplog.clear()
renew_from_kt(principal="test-principal", keytab="keytab")
assert ctx.value.code == 1
@@ -264,9 +264,9 @@ class TestKerberos:
]
def test_run_without_keytab(self, caplog):
- with pytest.raises(SystemExit) as ctx:
- with caplog.at_level(logging.WARNING, logger=kerberos.log.name):
- caplog.clear()
+ with caplog.at_level(logging.WARNING, logger=kerberos.log.name):
+ caplog.clear()
+ with pytest.raises(SystemExit) as ctx:
kerberos.run(principal="test-principal", keytab=None)
assert ctx.value.code == 0
assert caplog.messages == ["Keytab renewer not starting, no keytab
configured"]
diff --git a/tests/sensors/test_external_task_sensor.py
b/tests/sensors/test_external_task_sensor.py
index 8d63a88486..557e4cf00d 100644
--- a/tests/sensors/test_external_task_sensor.py
+++ b/tests/sensors/test_external_task_sensor.py
@@ -237,15 +237,15 @@ class TestExternalTaskSensor:
def test_external_task_group_not_exists_without_check_existence(self):
self.add_time_sensor()
self.add_dummy_task_group()
+ op = ExternalTaskSensor(
+ task_id="test_external_task_sensor_check",
+ external_dag_id=TEST_DAG_ID,
+ external_task_group_id="fake-task-group",
+ timeout=0.001,
+ dag=self.dag,
+ poke_interval=0.1,
+ )
with pytest.raises(AirflowException, match="Sensor has timed out"):
- op = ExternalTaskSensor(
- task_id="test_external_task_sensor_check",
- external_dag_id=TEST_DAG_ID,
- external_task_group_id="fake-task-group",
- timeout=0.001,
- dag=self.dag,
- poke_interval=0.1,
- )
op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE,
ignore_ti_state=True)
def test_external_task_group_sensor_success(self):
@@ -320,9 +320,9 @@ class TestExternalTaskSensor:
dag=self.dag,
)
error_message = rf"Some of the external tasks \['{TEST_TASK_ID}'\] in
DAG {TEST_DAG_ID} failed\."
- with pytest.raises(AirflowException, match=error_message):
- with caplog.at_level(logging.INFO, logger=op.log.name):
- caplog.clear()
+ with caplog.at_level(logging.INFO, logger=op.log.name):
+ caplog.clear()
+ with pytest.raises(AirflowException, match=error_message):
op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE,
ignore_ti_state=True)
assert (
f"Poking for tasks ['{TEST_TASK_ID}'] in dag {TEST_DAG_ID} on
{DEFAULT_DATE.isoformat()} ... "
@@ -423,9 +423,9 @@ class TestExternalTaskSensor:
rf"Some of the external tasks \['{TEST_TASK_ID}'\,
\'{TEST_TASK_ID_ALTERNATE}\'] "
rf"in DAG {TEST_DAG_ID} failed\."
)
- with pytest.raises(AirflowException, match=error_message):
- with caplog.at_level(logging.INFO, logger=op.log.name):
- caplog.clear()
+ with caplog.at_level(logging.INFO, logger=op.log.name):
+ caplog.clear()
+ with pytest.raises(AirflowException, match=error_message):
op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE,
ignore_ti_state=True)
assert (
f"Poking for tasks ['{TEST_TASK_ID}', '{TEST_TASK_ID_ALTERNATE}'] "
@@ -725,8 +725,8 @@ exit 0
allowed_states=["success"],
dag=self.dag,
)
+ op1.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
with pytest.raises(ValueError):
- op1.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
op2.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
def test_catch_duplicate_task_ids_with_multiple_xcom_args(self):
@@ -746,8 +746,8 @@ exit 0
allowed_states=["success"],
dag=self.dag,
)
+ op1.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
with pytest.raises(ValueError):
- op1.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
op2.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
def test_catch_invalid_allowed_states(self):
diff --git a/tests/sensors/test_filesystem.py b/tests/sensors/test_filesystem.py
index 9158e801ca..3c425a1b43 100644
--- a/tests/sensors/test_filesystem.py
+++ b/tests/sensors/test_filesystem.py
@@ -204,12 +204,12 @@ class TestFileSensor:
task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE,
ignore_ti_state=True)
shutil.rmtree(temp_dir)
- def test_subdirectory_empty(self):
- temp_dir = tempfile.mkdtemp()
- tempfile.mkdtemp(dir=temp_dir)
+ def test_subdirectory_empty(self, tmp_path):
+ (tmp_path / "subdir").mkdir()
+
task = FileSensor(
task_id="test",
- filepath=temp_dir,
+ filepath=tmp_path.as_posix(),
fs_conn_id="fs_default",
dag=self.dag,
timeout=0,
@@ -219,7 +219,6 @@ class TestFileSensor:
with pytest.raises(AirflowSensorTimeout):
task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE,
ignore_ti_state=True)
- shutil.rmtree(temp_dir)
def test_task_defer(self):
task = FileSensor(
diff --git a/tests/serialization/test_serde.py
b/tests/serialization/test_serde.py
index 6091e30afb..ed9e1d1356 100644
--- a/tests/serialization/test_serde.py
+++ b/tests/serialization/test_serde.py
@@ -178,12 +178,12 @@ class TestSerDe:
e = serialize(i)
assert i == e
+ i = {CLASSNAME: "cannot"}
with pytest.raises(AttributeError, match="^reserved"):
- i = {CLASSNAME: "cannot"}
serialize(i)
+ i = {SCHEMA_ID: "cannot"}
with pytest.raises(AttributeError, match="^reserved"):
- i = {SCHEMA_ID: "cannot"}
serialize(i)
def test_ser_namedtuple(self):
@@ -195,8 +195,8 @@ class TestSerDe:
assert i == e
def test_no_serializer(self):
+ i = Exception
with pytest.raises(TypeError, match="^cannot serialize"):
- i = Exception
serialize(i)
def test_ser_registered(self):
diff --git a/tests/utils/test_sqlalchemy.py b/tests/utils/test_sqlalchemy.py
index 1136a106cb..8ee0877dc3 100644
--- a/tests/utils/test_sqlalchemy.py
+++ b/tests/utils/test_sqlalchemy.py
@@ -155,8 +155,8 @@ class TestSqlAlchemyUtils:
guard.commit()
# Check the expected_commit is reset
- with pytest.raises(RuntimeError):
- self.session.execute(text("SELECT 1"))
+ self.session.execute(text("SELECT 1"))
+ with pytest.raises(RuntimeError, match="UNEXPECTED COMMIT"):
self.session.commit()
def test_prohibit_commit_specific_session_only(self):
diff --git a/tests/utils/test_task_group.py b/tests/utils/test_task_group.py
index 5bd7577a56..359980ec09 100644
--- a/tests/utils/test_task_group.py
+++ b/tests/utils/test_task_group.py
@@ -636,34 +636,32 @@ def test_duplicate_group_id():
execution_date = pendulum.parse("20200101")
- with pytest.raises(DuplicateTaskIdFound, match=r".* 'task1' .*"):
- with DAG("test_duplicate_group_id", start_date=execution_date):
- _ = EmptyOperator(task_id="task1")
- with TaskGroup("task1"):
- pass
+ with DAG("test_duplicate_group_id", start_date=execution_date):
+ _ = EmptyOperator(task_id="task1")
+ with pytest.raises(DuplicateTaskIdFound, match=r".* 'task1' .*"),
TaskGroup("task1"):
+ pass
- with pytest.raises(DuplicateTaskIdFound, match=r".* 'group1' .*"):
- with DAG("test_duplicate_group_id", start_date=execution_date):
- _ = EmptyOperator(task_id="task1")
- with TaskGroup("group1", prefix_group_id=False):
- with TaskGroup("group1"):
- pass
+ with DAG("test_duplicate_group_id", start_date=execution_date):
+ _ = EmptyOperator(task_id="task1")
+ with TaskGroup("group1", prefix_group_id=False):
+ with pytest.raises(DuplicateTaskIdFound, match=r".* 'group1' .*"),
TaskGroup("group1"):
+ pass
- with pytest.raises(DuplicateTaskIdFound, match=r".* 'group1' .*"):
- with DAG("test_duplicate_group_id", start_date=execution_date):
- with TaskGroup("group1", prefix_group_id=False):
+ with DAG("test_duplicate_group_id", start_date=execution_date):
+ with TaskGroup("group1", prefix_group_id=False):
+ with pytest.raises(DuplicateTaskIdFound, match=r".* 'group1' .*"):
_ = EmptyOperator(task_id="group1")
- with pytest.raises(DuplicateTaskIdFound, match=r".*
'group1.downstream_join_id' .*"):
- with DAG("test_duplicate_group_id", start_date=execution_date):
- _ = EmptyOperator(task_id="task1")
- with TaskGroup("group1"):
+ with DAG("test_duplicate_group_id", start_date=execution_date):
+ _ = EmptyOperator(task_id="task1")
+ with TaskGroup("group1"):
+ with pytest.raises(DuplicateTaskIdFound, match=r".*
'group1.downstream_join_id' .*"):
_ = EmptyOperator(task_id="downstream_join_id")
- with pytest.raises(DuplicateTaskIdFound, match=r".*
'group1.upstream_join_id' .*"):
- with DAG("test_duplicate_group_id", start_date=execution_date):
- _ = EmptyOperator(task_id="task1")
- with TaskGroup("group1"):
+ with DAG("test_duplicate_group_id", start_date=execution_date):
+ _ = EmptyOperator(task_id="task1")
+ with TaskGroup("group1"):
+ with pytest.raises(DuplicateTaskIdFound, match=r".*
'group1.upstream_join_id' .*"):
_ = EmptyOperator(task_id="upstream_join_id")
diff --git a/tests/www/test_app.py b/tests/www/test_app.py
index 571aca79cc..1e7bd67c9a 100644
--- a/tests/www/test_app.py
+++ b/tests/www/test_app.py
@@ -18,8 +18,6 @@
from __future__ import annotations
import hashlib
-import re
-import runpy
import sys
from datetime import timedelta
from unittest import mock
@@ -89,24 +87,9 @@ class TestApp:
assert b"success" == response.get_data()
assert response.status_code == 200
- @pytest.mark.parametrize(
- "base_url, expected_exception",
- [
- ("http://localhost:8080/internal-client", None),
- (
- "http://localhost:8080/internal-client/",
- AirflowConfigException("webserver.base_url conf cannot have a
trailing slash."),
- ),
- ],
- )
@dont_initialize_flask_app_submodules
- def test_should_respect_base_url_ignore_proxy_headers(self, base_url,
expected_exception):
- with conf_vars({("webserver", "base_url"): base_url}):
- if expected_exception:
- with pytest.raises(expected_exception.__class__,
match=re.escape(str(expected_exception))):
- app = application.cached_app(testing=True)
- app.url_map.add(Rule("/debug", endpoint="debug"))
- return
+ def test_should_respect_base_url_ignore_proxy_headers(self):
+ with conf_vars({("webserver", "base_url"):
"http://localhost:8080/internal-client"}):
app = application.cached_app(testing=True)
app.url_map.add(Rule("/debug", endpoint="debug"))
@@ -140,16 +123,14 @@ class TestApp:
assert b"success" == response.get_data()
assert response.status_code == 200
- @pytest.mark.parametrize(
- "base_url, expected_exception",
- [
- ("http://localhost:8080/internal-client", None),
- (
- "http://localhost:8080/internal-client/",
- AirflowConfigException("webserver.base_url conf cannot have a
trailing slash."),
- ),
- ],
- )
+ @dont_initialize_flask_app_submodules
+ def test_base_url_contains_trailing_slash(self):
+ with conf_vars({("webserver", "base_url"):
"http://localhost:8080/internal-client/"}):
+ with pytest.raises(
+ AirflowConfigException, match="webserver.base_url conf cannot
have a trailing slash"
+ ):
+ application.cached_app(testing=True)
+
@conf_vars(
{
("webserver", "enable_proxy_fix"): "True",
@@ -161,15 +142,8 @@ class TestApp:
}
)
@dont_initialize_flask_app_submodules
- def
test_should_respect_base_url_when_proxy_fix_and_base_url_is_set_up_but_headers_missing(
- self, base_url, expected_exception
- ):
- with conf_vars({("webserver", "base_url"): base_url}):
- if expected_exception:
- with pytest.raises(expected_exception.__class__,
match=re.escape(str(expected_exception))):
- app = application.cached_app(testing=True)
- app.url_map.add(Rule("/debug", endpoint="debug"))
- return
+ def
test_should_respect_base_url_when_proxy_fix_and_base_url_is_set_up_but_headers_missing(self):
+ with conf_vars({("webserver", "base_url"):
"http://localhost:8080/internal-client"}):
app = application.cached_app(testing=True)
app.url_map.add(Rule("/debug", endpoint="debug"))
@@ -262,27 +236,28 @@ class TestApp:
assert app.config["SESSION_COOKIE_SAMESITE"] == "Lax"
@pytest.mark.parametrize(
- "hash_method, result, exception",
+ "hash_method, result",
[
- ("sha512", hashlib.sha512, None),
- ("sha384", hashlib.sha384, None),
- ("sha256", hashlib.sha256, None),
- ("sha224", hashlib.sha224, None),
- ("sha1", hashlib.sha1, None),
- ("md5", hashlib.md5, None),
- (None, hashlib.md5, None),
- ("invalid", None, AirflowConfigException),
+ pytest.param("sha512", hashlib.sha512, id="sha512"),
+ pytest.param("sha384", hashlib.sha384, id="sha384"),
+ pytest.param("sha256", hashlib.sha256, id="sha256"),
+ pytest.param("sha224", hashlib.sha224, id="sha224"),
+ pytest.param("sha1", hashlib.sha1, id="sha1"),
+ pytest.param("md5", hashlib.md5, id="md5"),
+ pytest.param(None, hashlib.md5, id="default"),
],
)
- @dont_initialize_flask_app_submodules
- def test_should_respect_caching_hash_method(self, hash_method, result,
exception):
+
@dont_initialize_flask_app_submodules(skip_all_except=["init_auth_manager"])
+ def test_should_respect_caching_hash_method(self, hash_method, result):
with conf_vars({("webserver", "caching_hash_method"): hash_method}):
- if exception:
- with pytest.raises(expected_exception=exception):
- app = application.cached_app(testing=True)
- else:
- app = application.cached_app(testing=True)
- assert next(iter(app.extensions["cache"])).cache._hash_method
== result
+ app = application.cached_app(testing=True)
+ assert next(iter(app.extensions["cache"])).cache._hash_method ==
result
+
+ @dont_initialize_flask_app_submodules
+ def test_should_respect_caching_hash_method_invalid(self):
+ with conf_vars({("webserver", "caching_hash_method"): "invalid"}):
+ with pytest.raises(expected_exception=AirflowConfigException):
+ application.cached_app(testing=True)
class TestFlaskCli:
@@ -290,11 +265,12 @@ class TestFlaskCli:
def test_flask_cli_should_display_routes(self, capsys):
with mock.patch.dict("os.environ",
FLASK_APP="airflow.www.app:cached_app"), mock.patch.object(
sys, "argv", ["flask", "routes"]
- ), pytest.raises(SystemExit):
- from flask import __main__
-
- # We are not using run_module because of
https://github.com/pytest-dev/pytest/issues/9007
- runpy.run_path(__main__.__file__, run_name="main")
+ ):
+ # Import from flask.__main__ with a combination of mocking With
mocking sys.argv
+ # will invoke ``flask routes`` command.
+ with pytest.raises(SystemExit) as ex_ctx:
+ from flask import __main__ # noqa: F401
+ assert ex_ctx.value.code == 0
output = capsys.readouterr()
assert "/login/" in output.out