This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch v2-5-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 2ecbfd0118ac1db9bf87ed9a44853b11d6b972f0 Author: Ash Berlin-Taylor <[email protected]> AuthorDate: Mon Dec 12 17:38:03 2022 +0000 Replace freezegun with time-machine (#28193) The primary driver for this was a niggle that the durations output for one test was reporting over 52 years: > 1670340356.40s call tests/jobs/test_base_job.py::TestBaseJob::test_heartbeat It turns out this was caused by freezegun, but time_machine fixes this. It also might be a bit faster, but that isn't a noticeable difference for us. (No runtime difference for the changed files, but it does make collection quicker: from 10s to 8s) (cherry picked from commit 4d0fd8ef6adc35f683c7561f05688a65fd7451f4) --- setup.py | 3 +- tests/api/client/test_local_client.py | 4 +- .../endpoints/test_dag_run_endpoint.py | 4 +- tests/api_connexion/schemas/test_dataset_schema.py | 4 +- tests/conftest.py | 20 ++-- tests/core/test_sentry.py | 4 +- tests/dag_processing/test_manager.py | 6 +- tests/executors/test_celery_executor.py | 6 +- tests/jobs/test_scheduler_job.py | 4 +- tests/models/test_dag.py | 6 +- tests/models/test_dagbag.py | 14 +-- tests/models/test_taskinstance.py | 18 ++-- tests/models/test_timestamp.py | 6 +- tests/operators/test_datetime.py | 16 +-- tests/operators/test_latest_only_operator.py | 4 +- tests/operators/test_weekday.py | 10 +- tests/providers/amazon/aws/hooks/test_eks.py | 10 +- .../amazon/aws/sensors/test_s3_keys_unchanged.py | 24 ++--- .../amazon/aws/utils/test_eks_get_token.py | 4 +- .../elasticsearch/log/test_es_task_handler.py | 15 ++- .../test_cloud_storage_transfer_service.py | 4 +- tests/sensors/test_base.py | 107 ++++++++++----------- tests/sensors/test_time_sensor.py | 11 +-- tests/ti_deps/deps/test_not_in_retry_period_dep.py | 6 +- tests/ti_deps/deps/test_runnable_exec_date_dep.py | 6 +- tests/timetables/test_interval_timetable.py | 8 +- tests/timetables/test_trigger_timetable.py | 6 +- tests/utils/log/test_file_processor_handler.py | 8 +- tests/utils/test_serve_logs.py | 14 +-- tests/www/test_security.py | 10 +- tests/www/views/test_views_grid.py | 23 +++-- tests/www/views/test_views_tasks.py | 17 ++-- 32 files changed, 194 insertions(+), 208 deletions(-) diff --git a/setup.py b/setup.py index 5ed4bb756f..a132eb861c 100644 --- a/setup.py +++ b/setup.py @@ -339,7 +339,6 @@ mypy_dependencies = [ "types-croniter", "types-Deprecated", "types-docutils", - "types-freezegun", "types-paramiko", "types-protobuf", "types-python-dateutil", @@ -371,7 +370,6 @@ devel_only = [ "flake8-colors", "flake8-implicit-str-concat", "flaky", - "freezegun", "gitpython", "ipdb", # make sure that we are using stable sorting order from 5.* version (some changes were introduced @@ -403,6 +401,7 @@ devel_only = [ "requests_mock", "rich-click>=1.5", "semver", + "time-machine", "towncrier", "twine", "wheel", diff --git a/tests/api/client/test_local_client.py b/tests/api/client/test_local_client.py index 70188ba5e1..d079f6c510 100644 --- a/tests/api/client/test_local_client.py +++ b/tests/api/client/test_local_client.py @@ -24,7 +24,7 @@ from unittest.mock import patch import pendulum import pytest -from freezegun import freeze_time +import time_machine from airflow.api.client.local_client import Client from airflow.example_dags import example_bash_operator @@ -72,7 +72,7 @@ class TestLocalClient: run_after=pendulum.instance(EXECDATE_NOFRACTIONS) ) - with freeze_time(EXECDATE): + with time_machine.travel(EXECDATE, tick=False): # no execution date, execution date should be set automatically self.client.trigger_dag(dag_id=test_dag_id) diff --git a/tests/api_connexion/endpoints/test_dag_run_endpoint.py b/tests/api_connexion/endpoints/test_dag_run_endpoint.py index 93c9c6a1d0..a80e8b9f4a 100644 --- a/tests/api_connexion/endpoints/test_dag_run_endpoint.py +++ b/tests/api_connexion/endpoints/test_dag_run_endpoint.py @@ -20,7 +20,7 @@ from datetime import timedelta from unittest import mock import pytest -from freezegun import freeze_time +import time_machine from parameterized import parameterized from airflow.api_connexion.exceptions import EXCEPTIONS_LINK_MAP @@ -1350,7 +1350,7 @@ class TestPatchDagRunState(TestDagRunEndpoint): } @pytest.mark.parametrize("invalid_state", ["running"]) - @freeze_time(TestDagRunEndpoint.default_time) + @time_machine.travel(TestDagRunEndpoint.default_time) def test_should_response_400_for_non_existing_dag_run_state(self, invalid_state, dag_maker): dag_id = "TEST_DAG_ID" dag_run_id = "TEST_DAG_RUN_ID" diff --git a/tests/api_connexion/schemas/test_dataset_schema.py b/tests/api_connexion/schemas/test_dataset_schema.py index 5147768aed..85deb129f3 100644 --- a/tests/api_connexion/schemas/test_dataset_schema.py +++ b/tests/api_connexion/schemas/test_dataset_schema.py @@ -16,7 +16,7 @@ # under the License. from __future__ import annotations -from freezegun import freeze_time +import time_machine from airflow.api_connexion.schemas.dataset_schema import ( DatasetCollection, @@ -37,7 +37,7 @@ class TestDatasetSchemaBase: clear_db_dags() clear_db_datasets() self.timestamp = "2022-06-10T12:02:44+00:00" - self.freezer = freeze_time(self.timestamp) + self.freezer = time_machine.travel(self.timestamp, tick=False) self.freezer.start() def teardown_method(self) -> None: diff --git a/tests/conftest.py b/tests/conftest.py index bdaec7da0f..74d412f849 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -24,8 +24,8 @@ from contextlib import ExitStack, suppress from datetime import datetime, timedelta from typing import TYPE_CHECKING -import freezegun import pytest +import time_machine # We should set these before loading _any_ of the rest of airflow so that the # unit test mode config is set as early as possible. @@ -400,7 +400,7 @@ def pytest_runtest_setup(item): @pytest.fixture def frozen_sleep(monkeypatch): """ - Use freezegun to "stub" sleep, so that it takes no time, but that + Use time-machine to "stub" sleep, so that it takes no time, but that ``datetime.now()`` appears to move forwards If your module under test does ``import time`` and then ``time.sleep``:: @@ -416,21 +416,21 @@ def frozen_sleep(monkeypatch): monkeypatch.setattr('my_mod.sleep', frozen_sleep) my_mod.fn_under_test() """ - freezegun_control = None + traveller = None def fake_sleep(seconds): - nonlocal freezegun_control + nonlocal traveller utcnow = datetime.utcnow() - if freezegun_control is not None: - freezegun_control.stop() - freezegun_control = freezegun.freeze_time(utcnow + timedelta(seconds=seconds)) - freezegun_control.start() + if traveller is not None: + traveller.stop() + traveller = time_machine.travel(utcnow + timedelta(seconds=seconds)) + traveller.start() monkeypatch.setattr("time.sleep", fake_sleep) yield fake_sleep - if freezegun_control is not None: - freezegun_control.stop() + if traveller is not None: + traveller.stop() @pytest.fixture(scope="session") diff --git a/tests/core/test_sentry.py b/tests/core/test_sentry.py index 2b29a1c704..0e8607f2ad 100644 --- a/tests/core/test_sentry.py +++ b/tests/core/test_sentry.py @@ -22,7 +22,7 @@ import importlib from unittest import mock import pytest -from freezegun import freeze_time +import time_machine from sentry_sdk import configure_scope from airflow.operators.python import PythonOperator @@ -129,7 +129,7 @@ class TestSentryHook: for key, value in scope._tags.items(): assert TEST_SCOPE[key] == value - @freeze_time(CRUMB_DATE.isoformat()) + @time_machine.travel(CRUMB_DATE) def test_add_breadcrumbs(self, sentry, task_instance): """ Test adding breadcrumbs. diff --git a/tests/dag_processing/test_manager.py b/tests/dag_processing/test_manager.py index 1cbfb7275d..862dfdb2c8 100644 --- a/tests/dag_processing/test_manager.py +++ b/tests/dag_processing/test_manager.py @@ -34,7 +34,7 @@ from unittest import mock from unittest.mock import MagicMock, PropertyMock import pytest -from freezegun import freeze_time +import time_machine from sqlalchemy import func from airflow.callbacks.callback_requests import CallbackRequest, DagCallbackRequest, SlaCallbackRequest @@ -470,7 +470,7 @@ class TestDagFileProcessorManager: manager._file_stats = { "file_1.py": DagFileStat(1, 0, last_finish_time, timedelta(seconds=1.0), 1), } - with freeze_time(freezed_base_time): + with time_machine.travel(freezed_base_time): manager.set_file_paths(dag_files) assert manager._file_path_queue == collections.deque() # File Path Queue will be empty as the "modified time" < "last finish time" @@ -481,7 +481,7 @@ class TestDagFileProcessorManager: # than the last_parse_time but still less than now - min_file_process_interval file_1_new_mtime = freezed_base_time - timedelta(seconds=5) file_1_new_mtime_ts = file_1_new_mtime.timestamp() - with freeze_time(freezed_base_time): + with time_machine.travel(freezed_base_time): manager.set_file_paths(dag_files) assert manager._file_path_queue == collections.deque() # File Path Queue will be empty as the "modified time" < "last finish time" diff --git a/tests/executors/test_celery_executor.py b/tests/executors/test_celery_executor.py index cbbe64c564..2a52776dec 100644 --- a/tests/executors/test_celery_executor.py +++ b/tests/executors/test_celery_executor.py @@ -27,9 +27,9 @@ from unittest import mock # leave this it is used by the test worker import celery.contrib.testing.tasks # noqa: F401 import pytest +import time_machine from celery import Celery from celery.result import AsyncResult -from freezegun import freeze_time from kombu.asynchronous import set_event_loop from parameterized import parameterized @@ -162,7 +162,7 @@ class TestCeleryExecutor: assert executor.try_adopt_task_instances(tis) == tis @pytest.mark.backend("mysql", "postgres") - @freeze_time("2020-01-01") + @time_machine.travel("2020-01-01", tick=False) def test_try_adopt_task_instances(self): start_date = timezone.utcnow() - timedelta(days=2) @@ -270,7 +270,7 @@ class TestCeleryExecutor: assert ti.external_executor_id is None @pytest.mark.backend("mysql", "postgres") - @freeze_time("2020-01-01") + @time_machine.travel("2020-01-01", tick=False) def test_pending_tasks_timeout_with_appropriate_config_setting(self): start_date = timezone.utcnow() - timedelta(days=2) diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index b661293ea2..9d1b4dd452 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -30,7 +30,7 @@ from unittest.mock import MagicMock, patch import psutil import pytest -from freezegun import freeze_time +import time_machine from sqlalchemy import func import airflow.example_dags @@ -3290,7 +3290,7 @@ class TestSchedulerJob: assert dag3.get_last_dagrun().creating_job_id == self.scheduler_job.id - @freeze_time(DEFAULT_DATE + datetime.timedelta(days=1, seconds=9)) + @time_machine.travel(DEFAULT_DATE + datetime.timedelta(days=1, seconds=9), tick=False) @mock.patch("airflow.jobs.scheduler_job.Stats.timing") def test_start_dagruns(self, stats_timing, dag_maker): """ diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py index 855ecc4b6f..6fc846f20a 100644 --- a/tests/models/test_dag.py +++ b/tests/models/test_dag.py @@ -34,8 +34,8 @@ from unittest.mock import patch import jinja2 import pendulum import pytest +import time_machine from dateutil.relativedelta import relativedelta -from freezegun import freeze_time from sqlalchemy import inspect import airflow @@ -2122,7 +2122,7 @@ my_postgres_conn: # The DR should be scheduled in the last 2 hours, not 6 hours ago assert next_date == six_hours_ago_to_the_hour - @freeze_time(timezone.datetime(2020, 1, 5)) + @time_machine.travel(timezone.datetime(2020, 1, 5), tick=False) def test_next_dagrun_info_timedelta_schedule_and_catchup_false(self): """ Test that the dag file processor does not create multiple dagruns @@ -2142,7 +2142,7 @@ my_postgres_conn: next_info = dag.next_dagrun_info(next_info.data_interval) assert next_info and next_info.logical_date == timezone.datetime(2020, 1, 5) - @freeze_time(timezone.datetime(2020, 5, 4)) + @time_machine.travel(timezone.datetime(2020, 5, 4)) def test_next_dagrun_info_timedelta_schedule_and_catchup_true(self): """ Test that the dag file processor creates multiple dagruns diff --git a/tests/models/test_dagbag.py b/tests/models/test_dagbag.py index 102ad0b518..dbeaba5dd1 100644 --- a/tests/models/test_dagbag.py +++ b/tests/models/test_dagbag.py @@ -32,7 +32,7 @@ from unittest import mock from unittest.mock import patch import pytest -from freezegun import freeze_time +import time_machine from sqlalchemy import func from sqlalchemy.exc import OperationalError @@ -879,7 +879,7 @@ class TestDagBag: """ db_clean_up() session = settings.Session() - with freeze_time(tz.datetime(2020, 1, 5, 0, 0, 0)) as frozen_time: + with time_machine.travel(tz.datetime(2020, 1, 5, 0, 0, 0), tick=False) as frozen_time: dagbag = DagBag( dag_folder=os.path.join(TEST_DAGS_FOLDER, "test_example_bash_operator.py"), include_examples=False, @@ -889,7 +889,7 @@ class TestDagBag: def _sync_to_db(): mock_sync_perm_for_dag.reset_mock() - frozen_time.tick(20) + frozen_time.shift(20) dagbag.sync_to_db(session=session) dag = dagbag.dags["test_example_bash_operator"] @@ -950,7 +950,7 @@ class TestDagBag: Serialized DAG table after 'min_serialized_dag_fetch_interval' seconds are passed. """ - with freeze_time(tz.datetime(2020, 1, 5, 0, 0, 0)): + with time_machine.travel((tz.datetime(2020, 1, 5, 0, 0, 0)), tick=False): example_bash_op_dag = DagBag(include_examples=True).dags.get("example_bash_operator") SerializedDagModel.write_dag(dag=example_bash_op_dag) @@ -962,18 +962,18 @@ class TestDagBag: # Check that if min_serialized_dag_fetch_interval has not passed we do not fetch the DAG # from DB - with freeze_time(tz.datetime(2020, 1, 5, 0, 0, 4)): + with time_machine.travel((tz.datetime(2020, 1, 5, 0, 0, 4)), tick=False): with assert_queries_count(0): assert dag_bag.get_dag("example_bash_operator").tags == ["example", "example2"] # Make a change in the DAG and write Serialized DAG to the DB - with freeze_time(tz.datetime(2020, 1, 5, 0, 0, 6)): + with time_machine.travel((tz.datetime(2020, 1, 5, 0, 0, 6)), tick=False): example_bash_op_dag.tags += ["new_tag"] SerializedDagModel.write_dag(dag=example_bash_op_dag) # Since min_serialized_dag_fetch_interval is passed verify that calling 'dag_bag.get_dag' # fetches the Serialized DAG from DB - with freeze_time(tz.datetime(2020, 1, 5, 0, 0, 8)): + with time_machine.travel((tz.datetime(2020, 1, 5, 0, 0, 8)), tick=False): with assert_queries_count(2): updated_ser_dag_1 = dag_bag.get_dag("example_bash_operator") updated_ser_dag_1_update_time = dag_bag.dags_last_fetched["example_bash_operator"] diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py index 80b6d6d302..9bcfe5f677 100644 --- a/tests/models/test_taskinstance.py +++ b/tests/models/test_taskinstance.py @@ -33,7 +33,7 @@ from uuid import uuid4 import pendulum import pytest -from freezegun import freeze_time +import time_machine from airflow import models, settings from airflow.decorators import task, task_group @@ -564,11 +564,11 @@ class TestTaskInstance: assert ti.next_kwargs is None assert ti.state == state - @freeze_time("2021-09-19 04:56:35", as_kwarg="frozen_time") - def test_retry_delay(self, dag_maker, frozen_time=None): + def test_retry_delay(self, dag_maker, time_machine): """ Test that retry delays are respected """ + time_machine.move_to("2021-09-19 04:56:35", tick=False) with dag_maker(dag_id="test_retry_handling"): task = BashOperator( task_id="test_retry_handling_op", @@ -593,12 +593,12 @@ class TestTaskInstance: assert ti.try_number == 2 # second run -- still up for retry because retry_delay hasn't expired - frozen_time.tick(delta=datetime.timedelta(seconds=3)) + time_machine.coordinates.shift(3) run_with_error(ti) assert ti.state == State.UP_FOR_RETRY # third run -- failed - frozen_time.tick(delta=datetime.datetime.resolution) + time_machine.coordinates.shift(datetime.datetime.resolution) run_with_error(ti) assert ti.state == State.FAILED @@ -756,7 +756,7 @@ class TestTaskInstance: expected_try_number, expected_task_reschedule_count, ): - with freeze_time(run_date): + with time_machine.travel(run_date, tick=False): try: ti.run() except AirflowException: @@ -856,7 +856,7 @@ class TestTaskInstance: expected_task_reschedule_count, ): ti.refresh_from_task(task) - with freeze_time(run_date): + with time_machine.travel(run_date, tick=False): try: ti.run() except AirflowException: @@ -955,7 +955,7 @@ class TestTaskInstance: expected_task_reschedule_count, ): ti.refresh_from_task(task) - with freeze_time(run_date): + with time_machine.travel(run_date, tick=False): try: ti.run() except AirflowException: @@ -1023,7 +1023,7 @@ class TestTaskInstance: expected_try_number, expected_task_reschedule_count, ): - with freeze_time(run_date): + with time_machine.travel(run_date, tick=False): try: ti.run() except AirflowException: diff --git a/tests/models/test_timestamp.py b/tests/models/test_timestamp.py index 0313183f12..2315e25dd4 100644 --- a/tests/models/test_timestamp.py +++ b/tests/models/test_timestamp.py @@ -18,7 +18,7 @@ from __future__ import annotations import pendulum import pytest -from freezegun import freeze_time +import time_machine from airflow.models import Log, TaskInstance from airflow.operators.empty import EmptyOperator @@ -53,7 +53,7 @@ def add_log(execdate, session, dag_maker, timezone_override=None): @provide_session def test_timestamp_behaviour(dag_maker, session=None): execdate = timezone.utcnow() - with freeze_time(execdate): + with time_machine.travel(execdate, tick=False): current_time = timezone.utcnow() old_log = add_log(execdate, session, dag_maker) session.expunge(old_log) @@ -65,7 +65,7 @@ def test_timestamp_behaviour(dag_maker, session=None): @provide_session def test_timestamp_behaviour_with_timezone(dag_maker, session=None): execdate = timezone.utcnow() - with freeze_time(execdate): + with time_machine.travel(execdate, tick=False): current_time = timezone.utcnow() old_log = add_log(execdate, session, dag_maker, timezone_override=pendulum.timezone("Europe/Warsaw")) session.expunge(old_log) diff --git a/tests/operators/test_datetime.py b/tests/operators/test_datetime.py index 242442e8ea..027c4c1f0f 100644 --- a/tests/operators/test_datetime.py +++ b/tests/operators/test_datetime.py @@ -20,8 +20,8 @@ from __future__ import annotations import datetime import unittest -import freezegun import pytest +import time_machine from airflow.exceptions import AirflowException from airflow.models import DAG, DagRun, TaskInstance as TI @@ -113,7 +113,7 @@ class TestBranchDateTimeOperator(unittest.TestCase): dag=self.dag, ) - @freezegun.freeze_time("2020-07-07 10:54:05") + @time_machine.travel("2020-07-07 10:54:05") def test_branch_datetime_operator_falls_within_range(self): """Check BranchDateTimeOperator branch operation""" for target_lower, target_upper in self.targets: @@ -143,7 +143,7 @@ class TestBranchDateTimeOperator(unittest.TestCase): self.branch_op.target_upper = target_upper for date in dates: - with freezegun.freeze_time(date): + with time_machine.travel(date): self.branch_op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) self._assert_task_ids_match_states( @@ -154,7 +154,7 @@ class TestBranchDateTimeOperator(unittest.TestCase): } ) - @freezegun.freeze_time("2020-07-07 10:54:05") + @time_machine.travel("2020-07-07 10:54:05") def test_branch_datetime_operator_upper_comparison_within_range(self): """Check BranchDateTimeOperator branch operation""" for _, target_upper in self.targets: @@ -172,7 +172,7 @@ class TestBranchDateTimeOperator(unittest.TestCase): } ) - @freezegun.freeze_time("2020-07-07 10:54:05") + @time_machine.travel("2020-07-07 10:54:05") def test_branch_datetime_operator_lower_comparison_within_range(self): """Check BranchDateTimeOperator branch operation""" for target_lower, _ in self.targets: @@ -190,7 +190,7 @@ class TestBranchDateTimeOperator(unittest.TestCase): } ) - @freezegun.freeze_time("2020-07-07 12:00:00") + @time_machine.travel("2020-07-07 12:00:00") def test_branch_datetime_operator_upper_comparison_outside_range(self): """Check BranchDateTimeOperator branch operation""" for _, target_upper in self.targets: @@ -208,7 +208,7 @@ class TestBranchDateTimeOperator(unittest.TestCase): } ) - @freezegun.freeze_time("2020-07-07 09:00:00") + @time_machine.travel("2020-07-07 09:00:00") def test_branch_datetime_operator_lower_comparison_outside_range(self): """Check BranchDateTimeOperator branch operation""" for target_lower, _ in self.targets: @@ -226,7 +226,7 @@ class TestBranchDateTimeOperator(unittest.TestCase): } ) - @freezegun.freeze_time("2020-12-01 09:00:00") + @time_machine.travel("2020-12-01 09:00:00") def test_branch_datetime_operator_use_task_logical_date(self): """Check if BranchDateTimeOperator uses task execution date""" in_between_date = timezone.datetime(2020, 7, 7, 10, 30, 0) diff --git a/tests/operators/test_latest_only_operator.py b/tests/operators/test_latest_only_operator.py index cf1c3ca18f..8c0e3d0ae1 100644 --- a/tests/operators/test_latest_only_operator.py +++ b/tests/operators/test_latest_only_operator.py @@ -19,7 +19,7 @@ from __future__ import annotations import datetime -from freezegun import freeze_time +import time_machine from airflow import settings from airflow.models import DagRun, TaskInstance @@ -64,7 +64,7 @@ class TestLatestOnlyOperator: default_args={"owner": "airflow", "start_date": DEFAULT_DATE}, schedule=INTERVAL, ) - self.freezer = freeze_time(FROZEN_NOW) + self.freezer = time_machine.travel(FROZEN_NOW, tick=False) self.freezer.start() def teardown_method(self): diff --git a/tests/operators/test_weekday.py b/tests/operators/test_weekday.py index 046e593578..1c843c94c9 100644 --- a/tests/operators/test_weekday.py +++ b/tests/operators/test_weekday.py @@ -20,7 +20,7 @@ from __future__ import annotations import datetime import pytest -from freezegun import freeze_time +import time_machine from parameterized import parameterized from airflow.exceptions import AirflowException @@ -94,7 +94,7 @@ class TestBranchDayOfWeekOperator: @pytest.mark.parametrize( "weekday", TEST_CASE_BRANCH_FOLLOW_TRUE.values(), ids=TEST_CASE_BRANCH_FOLLOW_TRUE.keys() ) - @freeze_time("2021-01-25") # Monday + @time_machine.travel("2021-01-25") # Monday def test_branch_follow_true(self, weekday): """Checks if BranchDayOfWeekOperator follows true branch""" print(datetime.datetime.now()) @@ -131,7 +131,7 @@ class TestBranchDayOfWeekOperator: }, ) - @freeze_time("2021-01-25") # Monday + @time_machine.travel("2021-01-25") # Monday def test_branch_follow_true_with_execution_date(self): """Checks if BranchDayOfWeekOperator follows true branch when set use_task_logical_date""" @@ -166,7 +166,7 @@ class TestBranchDayOfWeekOperator: }, ) - @freeze_time("2021-01-25") # Monday + @time_machine.travel("2021-01-25") # Monday def test_branch_follow_false(self): """Checks if BranchDayOfWeekOperator follow false branch""" @@ -245,7 +245,7 @@ class TestBranchDayOfWeekOperator: dag=self.dag, ) - @freeze_time("2021-01-25") # Monday + @time_machine.travel("2021-01-25") # Monday def test_branch_xcom_push_true_branch(self): """Check if BranchDayOfWeekOperator push to xcom value of follow_task_ids_if_true""" branch_op = BranchDayOfWeekOperator( diff --git a/tests/providers/amazon/aws/hooks/test_eks.py b/tests/providers/amazon/aws/hooks/test_eks.py index 3d3e51f94a..157c870973 100644 --- a/tests/providers/amazon/aws/hooks/test_eks.py +++ b/tests/providers/amazon/aws/hooks/test_eks.py @@ -25,10 +25,10 @@ from unittest import mock from urllib.parse import ParseResult, urlsplit import pytest +import time_machine import yaml from _pytest._code import ExceptionInfo from botocore.exceptions import ClientError -from freezegun import freeze_time from moto import mock_eks from moto.core import DEFAULT_ACCOUNT_ID from moto.core.exceptions import AWSError @@ -298,7 +298,7 @@ class TestEksHooks: arn_under_test=generated_test_data.cluster_describe_output[ClusterAttributes.ARN], ) - @freeze_time(FROZEN_TIME) + @time_machine.travel(FROZEN_TIME, tick=False) def test_create_cluster_generates_valid_cluster_created_timestamp(self, cluster_builder) -> None: _, generated_test_data = cluster_builder() @@ -515,7 +515,7 @@ class TestEksHooks: arn_under_test=generated_test_data.nodegroup_describe_output[NodegroupAttributes.ARN], ) - @freeze_time(FROZEN_TIME) + @time_machine.travel(FROZEN_TIME) def test_create_nodegroup_generates_valid_nodegroup_created_timestamp(self, nodegroup_builder) -> None: _, generated_test_data = nodegroup_builder() @@ -523,7 +523,7 @@ class TestEksHooks: assert iso_date(result_time) == FROZEN_TIME - @freeze_time(FROZEN_TIME) + @time_machine.travel(FROZEN_TIME) def test_create_nodegroup_generates_valid_nodegroup_modified_timestamp(self, nodegroup_builder) -> None: _, generated_test_data = nodegroup_builder() @@ -917,7 +917,7 @@ class TestEksHooks: arn_under_test=generated_test_data.fargate_describe_output[FargateProfileAttributes.ARN], ) - @freeze_time(FROZEN_TIME) + @time_machine.travel(FROZEN_TIME) def test_create_fargate_profile_generates_valid_created_timestamp(self, fargate_profile_builder) -> None: _, generated_test_data = fargate_profile_builder() diff --git a/tests/providers/amazon/aws/sensors/test_s3_keys_unchanged.py b/tests/providers/amazon/aws/sensors/test_s3_keys_unchanged.py index 251f8d6258..726c4de7db 100644 --- a/tests/providers/amazon/aws/sensors/test_s3_keys_unchanged.py +++ b/tests/providers/amazon/aws/sensors/test_s3_keys_unchanged.py @@ -21,7 +21,7 @@ from datetime import datetime from unittest import mock import pytest -from freezegun import freeze_time +import time_machine from airflow.models.dag import DAG, AirflowException from airflow.providers.amazon.aws.sensors.s3 import S3KeysUnchangedSensor @@ -68,7 +68,7 @@ class TestS3KeysUnchangedSensor: dag=self.dag, ).render_template_fields({}) - @freeze_time(DEFAULT_DATE, auto_tick_seconds=10) + @time_machine.travel(DEFAULT_DATE) def test_files_deleted_between_pokes_throw_error(self): self.sensor.allow_delete = False self.sensor.is_keys_unchanged({"a", "b"}) @@ -98,19 +98,19 @@ class TestS3KeysUnchangedSensor: ), ], ) - @freeze_time(DEFAULT_DATE, auto_tick_seconds=10) - def test_key_changes(self, current_objects, expected_returns, inactivity_periods): - assert self.sensor.is_keys_unchanged(current_objects[0]) == expected_returns[0] - assert self.sensor.inactivity_seconds == inactivity_periods[0] - assert self.sensor.is_keys_unchanged(current_objects[1]) == expected_returns[1] - assert self.sensor.inactivity_seconds == inactivity_periods[1] - assert self.sensor.is_keys_unchanged(current_objects[2]) == expected_returns[2] - assert self.sensor.inactivity_seconds == inactivity_periods[2] + def test_key_changes(self, current_objects, expected_returns, inactivity_periods, time_machine): + time_machine.move_to(DEFAULT_DATE) + for current, expected, period in zip(current_objects, expected_returns, inactivity_periods): + assert self.sensor.is_keys_unchanged(current) == expected + assert self.sensor.inactivity_seconds == period + time_machine.coordinates.shift(10) - @freeze_time(DEFAULT_DATE, auto_tick_seconds=10) @mock.patch("airflow.providers.amazon.aws.sensors.s3.S3Hook") - def test_poke_succeeds_on_upload_complete(self, mock_hook): + def test_poke_succeeds_on_upload_complete(self, mock_hook, time_machine): + time_machine.move_to(DEFAULT_DATE) mock_hook.return_value.list_keys.return_value = {"a"} assert not self.sensor.poke(dict()) + time_machine.coordinates.shift(10) assert not self.sensor.poke(dict()) + time_machine.coordinates.shift(10) assert self.sensor.poke(dict()) diff --git a/tests/providers/amazon/aws/utils/test_eks_get_token.py b/tests/providers/amazon/aws/utils/test_eks_get_token.py index 700bd18373..f4b71cb94b 100644 --- a/tests/providers/amazon/aws/utils/test_eks_get_token.py +++ b/tests/providers/amazon/aws/utils/test_eks_get_token.py @@ -23,12 +23,12 @@ import runpy from unittest import mock import pytest -from freezegun import freeze_time +import time_machine class TestGetEksToken: @mock.patch("airflow.providers.amazon.aws.hooks.eks.EksHook") - @freeze_time("1995-02-14") + @time_machine.travel("1995-02-14", tick=False) @pytest.mark.parametrize( "args, expected_aws_conn_id, expected_region_name", [ diff --git a/tests/providers/elasticsearch/log/test_es_task_handler.py b/tests/providers/elasticsearch/log/test_es_task_handler.py index bdf274732e..5e23f0bd92 100644 --- a/tests/providers/elasticsearch/log/test_es_task_handler.py +++ b/tests/providers/elasticsearch/log/test_es_task_handler.py @@ -27,7 +27,6 @@ from unittest import mock from urllib.parse import quote import elasticsearch -import freezegun import pendulum import pytest @@ -500,7 +499,7 @@ class TestElasticsearchTaskHandler: assert self.es_task_handler.supports_external_link == expected @mock.patch("sys.__stdout__", new_callable=io.StringIO) - def test_dynamic_offset(self, stdout_mock, ti): + def test_dynamic_offset(self, stdout_mock, ti, time_machine): # arrange handler = ElasticsearchTaskHandler( base_log_folder=self.local_log_location, @@ -524,12 +523,12 @@ class TestElasticsearchTaskHandler: t2, t3 = t1 + pendulum.duration(seconds=5), t1 + pendulum.duration(seconds=10) # act - with freezegun.freeze_time(t1): - ti.log.info("Test") - with freezegun.freeze_time(t2): - ti.log.info("Test2") - with freezegun.freeze_time(t3): - ti.log.info("Test3") + time_machine.move_to(t1, tick=False) + ti.log.info("Test") + time_machine.move_to(t2, tick=False) + ti.log.info("Test2") + time_machine.move_to(t3, tick=False) + ti.log.info("Test3") # assert first_log, second_log, third_log = map(json.loads, stdout_mock.getvalue().strip().split("\n")) diff --git a/tests/providers/google/cloud/operators/test_cloud_storage_transfer_service.py b/tests/providers/google/cloud/operators/test_cloud_storage_transfer_service.py index d696ffa14d..3c45636214 100644 --- a/tests/providers/google/cloud/operators/test_cloud_storage_transfer_service.py +++ b/tests/providers/google/cloud/operators/test_cloud_storage_transfer_service.py @@ -24,8 +24,8 @@ from datetime import date, time from unittest import mock import pytest +import time_machine from botocore.credentials import Credentials -from freezegun import freeze_time from parameterized import parameterized from airflow.exceptions import AirflowException @@ -189,7 +189,7 @@ class TestTransferJobPreprocessor(unittest.TestCase): TransferJobPreprocessor(body=body).process_body() assert body[SCHEDULE][START_TIME_OF_DAY] == DICT_TIME - @freeze_time("2018-10-15") + @time_machine.travel("2018-10-15", tick=False) def test_should_set_default_schedule(self): body = {} TransferJobPreprocessor(body=body, default_schedule=True).process_body() diff --git a/tests/sensors/test_base.py b/tests/sensors/test_base.py index 8545b99f59..7417c183d8 100644 --- a/tests/sensors/test_base.py +++ b/tests/sensors/test_base.py @@ -21,7 +21,7 @@ from datetime import timedelta from unittest.mock import Mock, patch import pytest -from freezegun import freeze_time +import time_machine from airflow.exceptions import AirflowException, AirflowRescheduleException, AirflowSensorTimeout from airflow.models import TaskReschedule @@ -158,14 +158,14 @@ class TestBaseSensor: if ti.task_id == DUMMY_OP: assert ti.state == State.NONE - def test_ok_with_reschedule(self, make_sensor): + def test_ok_with_reschedule(self, make_sensor, time_machine): sensor, dr = make_sensor(return_value=None, poke_interval=10, timeout=25, mode="reschedule") sensor.poke = Mock(side_effect=[False, False, True]) # first poke returns False and task is re-scheduled date1 = timezone.utcnow() - with freeze_time(date1): - self._run(sensor) + time_machine.move_to(date1, tick=False) + self._run(sensor) tis = dr.get_task_instances() assert len(tis) == 2 for ti in tis: @@ -183,9 +183,9 @@ class TestBaseSensor: assert ti.state == State.NONE # second poke returns False and task is re-scheduled + time_machine.coordinates.shift(sensor.poke_interval) date2 = date1 + timedelta(seconds=sensor.poke_interval) - with freeze_time(date2): - self._run(sensor) + self._run(sensor) tis = dr.get_task_instances() assert len(tis) == 2 for ti in tis: @@ -203,9 +203,8 @@ class TestBaseSensor: assert ti.state == State.NONE # third poke returns True and task succeeds - date3 = date2 + timedelta(seconds=sensor.poke_interval) - with freeze_time(date3): - self._run(sensor) + time_machine.coordinates.shift(sensor.poke_interval) + self._run(sensor) tis = dr.get_task_instances() assert len(tis) == 2 for ti in tis: @@ -216,13 +215,13 @@ class TestBaseSensor: if ti.task_id == DUMMY_OP: assert ti.state == State.NONE - def test_fail_with_reschedule(self, make_sensor): + def test_fail_with_reschedule(self, make_sensor, time_machine): sensor, dr = make_sensor(return_value=False, poke_interval=10, timeout=5, mode="reschedule") # first poke returns False and task is re-scheduled date1 = timezone.utcnow() - with freeze_time(date1): - self._run(sensor) + time_machine.move_to(date1, tick=False) + self._run(sensor) tis = dr.get_task_instances() assert len(tis) == 2 for ti in tis: @@ -232,10 +231,9 @@ class TestBaseSensor: assert ti.state == State.NONE # second poke returns False, timeout occurs - date2 = date1 + timedelta(seconds=sensor.poke_interval) - with freeze_time(date2): - with pytest.raises(AirflowSensorTimeout): - self._run(sensor) + time_machine.coordinates.shift(sensor.poke_interval) + with pytest.raises(AirflowSensorTimeout): + self._run(sensor) tis = dr.get_task_instances() assert len(tis) == 2 for ti in tis: @@ -244,15 +242,15 @@ class TestBaseSensor: if ti.task_id == DUMMY_OP: assert ti.state == State.NONE - def test_soft_fail_with_reschedule(self, make_sensor): + def test_soft_fail_with_reschedule(self, make_sensor, time_machine): sensor, dr = make_sensor( return_value=False, poke_interval=10, timeout=5, soft_fail=True, mode="reschedule" ) # first poke returns False and task is re-scheduled date1 = timezone.utcnow() - with freeze_time(date1): - self._run(sensor) + time_machine.move_to(date1, tick=False) + self._run(sensor) tis = dr.get_task_instances() assert len(tis) == 2 for ti in tis: @@ -262,9 +260,8 @@ class TestBaseSensor: assert ti.state == State.NONE # second poke returns False, timeout occurs - date2 = date1 + timedelta(seconds=sensor.poke_interval) - with freeze_time(date2): - self._run(sensor) + time_machine.coordinates.shift(sensor.poke_interval) + self._run(sensor) tis = dr.get_task_instances() assert len(tis) == 2 for ti in tis: @@ -273,7 +270,7 @@ class TestBaseSensor: if ti.task_id == DUMMY_OP: assert ti.state == State.NONE - def test_ok_with_reschedule_and_retry(self, make_sensor): + def test_ok_with_reschedule_and_retry(self, make_sensor, time_machine): sensor, dr = make_sensor( return_value=None, poke_interval=10, @@ -286,8 +283,8 @@ class TestBaseSensor: # first poke returns False and task is re-scheduled date1 = timezone.utcnow() - with freeze_time(date1): - self._run(sensor) + time_machine.move_to(date1, tick=False) + self._run(sensor) tis = dr.get_task_instances() assert len(tis) == 2 for ti in tis: @@ -303,10 +300,9 @@ class TestBaseSensor: assert ti.state == State.NONE # second poke timesout and task instance is failed - date2 = date1 + timedelta(seconds=sensor.poke_interval) - with freeze_time(date2): - with pytest.raises(AirflowSensorTimeout): - self._run(sensor) + time_machine.coordinates.shift(sensor.poke_interval) + with pytest.raises(AirflowSensorTimeout): + self._run(sensor) tis = dr.get_task_instances() assert len(tis) == 2 for ti in tis: @@ -319,9 +315,9 @@ class TestBaseSensor: sensor.clear() # third poke returns False and task is rescheduled again - date3 = date2 + timedelta(seconds=sensor.poke_interval) + sensor.retry_delay - with freeze_time(date3): - self._run(sensor) + date3 = date1 + timedelta(seconds=sensor.poke_interval) * 2 + sensor.retry_delay + time_machine.coordinates.shift(sensor.poke_interval + sensor.retry_delay.total_seconds()) + self._run(sensor) tis = dr.get_task_instances() assert len(tis) == 2 for ti in tis: @@ -337,9 +333,10 @@ class TestBaseSensor: assert ti.state == State.NONE # fourth poke return True and task succeeds - date4 = date3 + timedelta(seconds=sensor.poke_interval) - with freeze_time(date4): - self._run(sensor) + + time_machine.coordinates.shift(sensor.poke_interval) + self._run(sensor) + tis = dr.get_task_instances() assert len(tis) == 2 for ti in tis: @@ -368,7 +365,7 @@ class TestBaseSensor: ) # first poke returns False and task is re-scheduled - with freeze_time(date1): + with time_machine.travel(date1, tick=False): self._run(sensor) tis = dr.get_task_instances() assert len(tis) == 2 @@ -385,7 +382,7 @@ class TestBaseSensor: assert ti.state == State.NONE # second poke returns False and task is re-scheduled - with freeze_time(date2): + with time_machine.travel(date2, tick=False): self._run(sensor) tis = dr.get_task_instances() assert len(tis) == 2 @@ -402,7 +399,7 @@ class TestBaseSensor: assert ti.state == State.NONE # third poke returns True and task succeeds - with freeze_time(date3): + with time_machine.travel(date3, tick=False): self._run(sensor) tis = dr.get_task_instances() assert len(tis) == 2 @@ -418,7 +415,7 @@ class TestBaseSensor: # poke returns False and AirflowRescheduleException is raised date1 = timezone.utcnow() - with freeze_time(date1): + with time_machine.travel(date1, tick=False): self._run(sensor, test_mode=True) tis = dr.get_task_instances() assert len(tis) == 2 @@ -545,7 +542,7 @@ class TestBaseSensor: sensor, _ = make_sensor(poke_interval=60 * 60 * 24, mode="reschedule", return_value=False) # A few hours until TIMESTAMP's limit, the next poke will take us over. - with freeze_time(datetime(2038, 1, 19, tzinfo=timezone.utc)): + with time_machine.travel(datetime(2038, 1, 19, tzinfo=timezone.utc), tick=False): with pytest.raises(AirflowSensorTimeout) as ctx: self._run(sensor) assert str(ctx.value) == ( @@ -553,7 +550,7 @@ class TestBaseSensor: "since it is over MySQL's TIMESTAMP storage limit." ) - def test_reschedule_and_retry_timeout(self, make_sensor): + def test_reschedule_and_retry_timeout(self, make_sensor, time_machine): """ Test mode="reschedule", retries and timeout configurations interact correctly. @@ -606,42 +603,40 @@ class TestBaseSensor: # first poke returns False and task is re-scheduled date1 = timezone.utcnow() - with freeze_time(date1): - self._run(sensor) + time_machine.move_to(date1, tick=False) + self._run(sensor) assert_ti_state(1, 2, State.UP_FOR_RESCHEDULE) # second poke raises RuntimeError and task instance retries - date2 = date1 + timedelta(seconds=sensor.poke_interval) - with freeze_time(date2), pytest.raises(RuntimeError): + time_machine.coordinates.shift(sensor.poke_interval) + with pytest.raises(RuntimeError): self._run(sensor) assert_ti_state(2, 2, State.UP_FOR_RETRY) # third poke returns False and task is rescheduled again - date3 = date2 + sensor.retry_delay + timedelta(seconds=1) - with freeze_time(date3): - self._run(sensor) + time_machine.coordinates.shift(sensor.retry_delay + timedelta(seconds=1)) + self._run(sensor) assert_ti_state(2, 2, State.UP_FOR_RESCHEDULE) # fourth poke times out and raises AirflowSensorTimeout - date4 = date3 + timedelta(seconds=sensor.poke_interval) - with freeze_time(date4), pytest.raises(AirflowSensorTimeout): + time_machine.coordinates.shift(sensor.poke_interval) + with pytest.raises(AirflowSensorTimeout): self._run(sensor) assert_ti_state(3, 2, State.FAILED) # Clear the failed sensor sensor.clear() - date_i = date4 + timedelta(seconds=20) + time_machine.coordinates.shift(20) for _ in range(3): - date_i += timedelta(seconds=sensor.poke_interval) - with freeze_time(date_i): - self._run(sensor) + time_machine.coordinates.shift(sensor.poke_interval) + self._run(sensor) assert_ti_state(3, 4, State.UP_FOR_RESCHEDULE) # Last poke times out and raises AirflowSensorTimeout - date8 = date_i + timedelta(seconds=sensor.poke_interval) - with freeze_time(date8), pytest.raises(AirflowSensorTimeout): + time_machine.coordinates.shift(sensor.poke_interval) + with pytest.raises(AirflowSensorTimeout): self._run(sensor) assert_ti_state(4, 4, State.FAILED) diff --git a/tests/sensors/test_time_sensor.py b/tests/sensors/test_time_sensor.py index 70b875311c..2ccfdd2c42 100644 --- a/tests/sensors/test_time_sensor.py +++ b/tests/sensors/test_time_sensor.py @@ -20,9 +20,9 @@ from __future__ import annotations from datetime import datetime, time from unittest.mock import patch -import freezegun import pendulum import pytest +import time_machine from airflow.exceptions import TaskDeferred from airflow.models.dag import DAG @@ -35,10 +35,6 @@ DEFAULT_DATE_WO_TZ = datetime(2015, 1, 1) DEFAULT_DATE_WITH_TZ = datetime(2015, 1, 1, tzinfo=pendulum.tz.timezone(DEFAULT_TIMEZONE)) -@patch( - "airflow.sensors.time_sensor.timezone.utcnow", - return_value=timezone.datetime(2020, 1, 1, 23, 0).replace(tzinfo=timezone.utc), -) class TestTimeSensor: @pytest.mark.parametrize( "default_timezone, start_date, expected", @@ -48,7 +44,8 @@ class TestTimeSensor: (DEFAULT_TIMEZONE, DEFAULT_DATE_WO_TZ, False), ], ) - def test_timezone(self, mock_utcnow, default_timezone, start_date, expected): + @time_machine.travel(timezone.datetime(2020, 1, 1, 23, 0).replace(tzinfo=timezone.utc)) + def test_timezone(self, default_timezone, start_date, expected): with patch("airflow.settings.TIMEZONE", pendulum.timezone(default_timezone)): dag = DAG("test", default_args={"start_date": start_date}) op = TimeSensor(task_id="test", target_time=time(10, 0), dag=dag) @@ -56,7 +53,7 @@ class TestTimeSensor: class TestTimeSensorAsync: - @freezegun.freeze_time("2020-07-07 00:00:00") + @time_machine.travel("2020-07-07 00:00:00", tick=False) def test_task_is_deferred(self): with DAG("test_task_is_deferred", start_date=timezone.datetime(2020, 1, 1, 23, 0)): op = TimeSensorAsync(task_id="test", target_time=time(10, 0)) diff --git a/tests/ti_deps/deps/test_not_in_retry_period_dep.py b/tests/ti_deps/deps/test_not_in_retry_period_dep.py index 107376d2a3..2abf42273a 100644 --- a/tests/ti_deps/deps/test_not_in_retry_period_dep.py +++ b/tests/ti_deps/deps/test_not_in_retry_period_dep.py @@ -20,7 +20,7 @@ from __future__ import annotations from datetime import timedelta from unittest.mock import Mock -from freezegun import freeze_time +import time_machine from airflow.models import TaskInstance from airflow.ti_deps.deps.not_in_retry_period_dep import NotInRetryPeriodDep @@ -35,7 +35,7 @@ class TestNotInRetryPeriodDep: ti.end_date = end_date return ti - @freeze_time("2016-01-01 15:44") + @time_machine.travel("2016-01-01 15:44") def test_still_in_retry_period(self): """ Task instances that are in their retry period should fail this dep @@ -44,7 +44,7 @@ class TestNotInRetryPeriodDep: assert ti.is_premature assert not NotInRetryPeriodDep().is_met(ti=ti) - @freeze_time("2016-01-01 15:46") + @time_machine.travel("2016-01-01 15:46") def test_retry_period_finished(self): """ Task instance's that have had their retry period elapse should pass this dep diff --git a/tests/ti_deps/deps/test_runnable_exec_date_dep.py b/tests/ti_deps/deps/test_runnable_exec_date_dep.py index eebde0cd42..4df559e489 100644 --- a/tests/ti_deps/deps/test_runnable_exec_date_dep.py +++ b/tests/ti_deps/deps/test_runnable_exec_date_dep.py @@ -20,7 +20,7 @@ from __future__ import annotations from unittest.mock import Mock, patch import pytest -from freezegun import freeze_time +import time_machine from airflow import settings from airflow.models import DagRun, TaskInstance @@ -36,7 +36,7 @@ def clean_db(session): session.query(TaskInstance).delete() -@freeze_time("2016-11-01") +@time_machine.travel("2016-11-01") @pytest.mark.parametrize( "allow_trigger_in_future,schedule_interval,execution_date,is_met", [ @@ -74,7 +74,7 @@ def test_exec_date_dep( assert RunnableExecDateDep().is_met(ti=ti) == is_met -@freeze_time("2016-01-01") +@time_machine.travel("2016-01-01") def test_exec_date_after_end_date(session, dag_maker, create_dummy_dag): """ If the dag's execution date is in the future this dep should fail diff --git a/tests/timetables/test_interval_timetable.py b/tests/timetables/test_interval_timetable.py index 3c324672a7..596c274cf7 100644 --- a/tests/timetables/test_interval_timetable.py +++ b/tests/timetables/test_interval_timetable.py @@ -20,9 +20,9 @@ from __future__ import annotations import datetime import dateutil.relativedelta -import freezegun import pendulum import pytest +import time_machine from airflow.exceptions import AirflowTimetableInvalid from airflow.settings import TIMEZONE @@ -52,7 +52,7 @@ DELTA_FROM_MIDNIGHT = datetime.timedelta(minutes=30, hours=16) "last_automated_data_interval", [pytest.param(None, id="first-run"), pytest.param(PREV_DATA_INTERVAL, id="subsequent")], ) [email protected]_time(CURRENT_TIME) +@time_machine.travel(CURRENT_TIME) def test_no_catchup_first_starts_at_current_time( last_automated_data_interval: DataInterval | None, ) -> None: @@ -73,7 +73,7 @@ def test_no_catchup_first_starts_at_current_time( "catchup", [pytest.param(True, id="catchup_true"), pytest.param(False, id="catchup_false")], ) [email protected]_time(CURRENT_TIME) +@time_machine.travel(CURRENT_TIME) def test_new_schedule_interval_next_info_starts_at_new_time( earliest: pendulum.DateTime | None, catchup: bool, @@ -100,7 +100,7 @@ def test_new_schedule_interval_next_info_starts_at_new_time( "last_automated_data_interval", [pytest.param(None, id="first-run"), pytest.param(PREV_DATA_INTERVAL, id="subsequent")], ) [email protected]_time(CURRENT_TIME) +@time_machine.travel(CURRENT_TIME) def test_no_catchup_next_info_starts_at_current_time( timetable: Timetable, last_automated_data_interval: DataInterval | None, diff --git a/tests/timetables/test_trigger_timetable.py b/tests/timetables/test_trigger_timetable.py index cabb1198ef..6f1d44479f 100644 --- a/tests/timetables/test_trigger_timetable.py +++ b/tests/timetables/test_trigger_timetable.py @@ -20,10 +20,10 @@ import datetime import typing import dateutil.relativedelta -import freezegun import pendulum import pendulum.tz import pytest +import time_machine from airflow.exceptions import AirflowTimetableInvalid from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction @@ -63,7 +63,7 @@ DELTA_FROM_MIDNIGHT = datetime.timedelta(minutes=30, hours=16) ), ], ) [email protected]_time(CURRENT_TIME) +@time_machine.travel(CURRENT_TIME) def test_daily_cron_trigger_no_catchup_first_starts_at_next_schedule( last_automated_data_interval: DataInterval | None, next_start_time: pendulum.DateTime, @@ -105,7 +105,7 @@ def test_hourly_cron_trigger_no_catchup_next_info( earliest: pendulum.DateTime, expected: DagRunInfo, ) -> None: - with freezegun.freeze_time(current_time): + with time_machine.travel(current_time): next_info = HOURLY_CRON_TRIGGER_TIMETABLE.next_dagrun_info( last_automated_data_interval=PREV_DATA_INTERVAL_EXACT, restriction=TimeRestriction(earliest=earliest, latest=None, catchup=False), diff --git a/tests/utils/log/test_file_processor_handler.py b/tests/utils/log/test_file_processor_handler.py index 1d479c71da..e11ff35b0c 100644 --- a/tests/utils/log/test_file_processor_handler.py +++ b/tests/utils/log/test_file_processor_handler.py @@ -21,7 +21,7 @@ import os import shutil from datetime import timedelta -from freezegun import freeze_time +import time_machine from airflow.utils import timezone from airflow.utils.log.file_processor_handler import FileProcessorHandler @@ -77,13 +77,13 @@ class TestFileProcessorHandler: link = os.path.join(self.base_log_folder, "latest") - with freeze_time(date1): + with time_machine.travel(date1, tick=False): handler.set_context(filename=os.path.join(self.dag_dir, "log1")) assert os.path.islink(link) assert os.path.basename(os.readlink(link)) == date1 assert os.path.exists(os.path.join(link, "log1")) - with freeze_time(date2): + with time_machine.travel(date2, tick=False): handler.set_context(filename=os.path.join(self.dag_dir, "log2")) assert os.path.islink(link) assert os.path.basename(os.readlink(link)) == date2 @@ -104,7 +104,7 @@ class TestFileProcessorHandler: os.remove(link) os.makedirs(link) - with freeze_time(date1): + with time_machine.travel(date1, tick=False): handler.set_context(filename=os.path.join(self.dag_dir, "log1")) def teardown_method(self): diff --git a/tests/utils/test_serve_logs.py b/tests/utils/test_serve_logs.py index e306c50e2b..5288e646ed 100644 --- a/tests/utils/test_serve_logs.py +++ b/tests/utils/test_serve_logs.py @@ -21,7 +21,7 @@ from typing import TYPE_CHECKING import jwt import pytest -from freezegun import freeze_time +import time_machine from airflow.utils.jwt_signer import JWTSigner from airflow.utils.serve_logs import create_app @@ -92,7 +92,7 @@ class TestServeLogs: assert response.status_code == 403 def test_forbidden_expired(self, client: FlaskClient, signer): - with freeze_time("2010-01-14"): + with time_machine.travel("2010-01-14"): token = signer.generate_signed_token({"filename": "sample.log"}) assert ( client.get( @@ -105,7 +105,7 @@ class TestServeLogs: ) def test_forbidden_future(self, client: FlaskClient, signer): - with freeze_time(datetime.datetime.utcnow() + datetime.timedelta(seconds=3600)): + with time_machine.travel(datetime.datetime.utcnow() + datetime.timedelta(seconds=3600)): token = signer.generate_signed_token({"filename": "sample.log"}) assert ( client.get( @@ -118,7 +118,7 @@ class TestServeLogs: ) def test_ok_with_short_future_skew(self, client: FlaskClient, signer): - with freeze_time(datetime.datetime.utcnow() + datetime.timedelta(seconds=1)): + with time_machine.travel(datetime.datetime.utcnow() + datetime.timedelta(seconds=1)): token = signer.generate_signed_token({"filename": "sample.log"}) assert ( client.get( @@ -131,7 +131,7 @@ class TestServeLogs: ) def test_ok_with_short_past_skew(self, client: FlaskClient, signer): - with freeze_time(datetime.datetime.utcnow() - datetime.timedelta(seconds=31)): + with time_machine.travel(datetime.datetime.utcnow() - datetime.timedelta(seconds=31)): token = signer.generate_signed_token({"filename": "sample.log"}) assert ( client.get( @@ -144,7 +144,7 @@ class TestServeLogs: ) def test_forbidden_with_long_future_skew(self, client: FlaskClient, signer): - with freeze_time(datetime.datetime.utcnow() + datetime.timedelta(seconds=10)): + with time_machine.travel(datetime.datetime.utcnow() + datetime.timedelta(seconds=10)): token = signer.generate_signed_token({"filename": "sample.log"}) assert ( client.get( @@ -157,7 +157,7 @@ class TestServeLogs: ) def test_forbidden_with_long_past_skew(self, client: FlaskClient, signer): - with freeze_time(datetime.datetime.utcnow() - datetime.timedelta(seconds=40)): + with time_machine.travel(datetime.datetime.utcnow() - datetime.timedelta(seconds=40)): token = signer.generate_signed_token({"filename": "sample.log"}) assert ( client.get( diff --git a/tests/www/test_security.py b/tests/www/test_security.py index b34842a92d..31e6ca3b46 100644 --- a/tests/www/test_security.py +++ b/tests/www/test_security.py @@ -23,9 +23,9 @@ import logging from unittest import mock import pytest +import time_machine from flask_appbuilder import SQLA, Model, expose, has_access from flask_appbuilder.views import BaseView, ModelView -from freezegun import freeze_time from sqlalchemy import Column, Date, Float, Integer, String from airflow.exceptions import AirflowException @@ -904,7 +904,7 @@ def old_user(): return user -@freeze_time(datetime.datetime(1985, 11, 5, 1, 24, 0)) # Get the Delorean, doc! +@time_machine.travel(datetime.datetime(1985, 11, 5, 1, 24, 0), tick=False) def test_update_user_auth_stat_first_successful_auth(mock_security_manager, new_user): mock_security_manager.update_user_auth_stat(new_user, success=True) @@ -914,7 +914,7 @@ def test_update_user_auth_stat_first_successful_auth(mock_security_manager, new_ assert mock_security_manager.update_user.called_once -@freeze_time(datetime.datetime(1985, 11, 5, 1, 24, 0)) +@time_machine.travel(datetime.datetime(1985, 11, 5, 1, 24, 0), tick=False) def test_update_user_auth_stat_subsequent_successful_auth(mock_security_manager, old_user): mock_security_manager.update_user_auth_stat(old_user, success=True) @@ -924,7 +924,7 @@ def test_update_user_auth_stat_subsequent_successful_auth(mock_security_manager, assert mock_security_manager.update_user.called_once -@freeze_time(datetime.datetime(1985, 11, 5, 1, 24, 0)) +@time_machine.travel(datetime.datetime(1985, 11, 5, 1, 24, 0), tick=False) def test_update_user_auth_stat_first_unsuccessful_auth(mock_security_manager, new_user): mock_security_manager.update_user_auth_stat(new_user, success=False) @@ -934,7 +934,7 @@ def test_update_user_auth_stat_first_unsuccessful_auth(mock_security_manager, ne assert mock_security_manager.update_user.called_once -@freeze_time(datetime.datetime(1985, 11, 5, 1, 24, 0)) +@time_machine.travel(datetime.datetime(1985, 11, 5, 1, 24, 0), tick=False) def test_update_user_auth_stat_subsequent_unsuccessful_auth(mock_security_manager, old_user): mock_security_manager.update_user_auth_stat(old_user, success=False) diff --git a/tests/www/views/test_views_grid.py b/tests/www/views/test_views_grid.py index 40dd1ce917..bc17ddc5de 100644 --- a/tests/www/views/test_views_grid.py +++ b/tests/www/views/test_views_grid.py @@ -17,8 +17,6 @@ # under the License. from __future__ import annotations -from datetime import datetime, timedelta - import pendulum import pytest from dateutil.tz import UTC @@ -29,6 +27,7 @@ from airflow.models import DagBag from airflow.models.dagrun import DagRun from airflow.models.dataset import DatasetDagRunQueue, DatasetEvent, DatasetModel from airflow.operators.empty import EmptyOperator +from airflow.utils import timezone from airflow.utils.state import DagRunState, TaskInstanceState from airflow.utils.task_group import TaskGroup from airflow.utils.types import DagRunType @@ -129,6 +128,14 @@ def test_no_runs(admin_client, dag_without_runs): } +# Create this as a fixture so that it is applied before the `dag_with_runs` fixture is! [email protected] +def freeze_time_for_dagruns(time_machine): + time_machine.move_to("2022-01-02T00:00:00+00:00", tick=False) + yield + + [email protected]("freeze_time_for_dagruns") def test_one_run(admin_client, dag_with_runs: list[DagRun], session): """ Test a DAG with complex interaction of states: @@ -159,22 +166,14 @@ def test_one_run(admin_client, dag_with_runs: list[DagRun], session): assert resp.status_code == 200, resp.json - # We cannot use freezegun here as it does not play well with Flask 2.2 and SqlAlchemy - # Unlike real datetime, when FakeDatetime is used, it coerces to - # '2020-08-06 09:00:00+00:00' which is rejected by MySQL for EXPIRY Column - current_date_placeholder = "2022-01-02T00:00:00+00:00" - actual_date_in_json = datetime.fromisoformat(resp.json["dag_runs"][0]["end_date"]) - assert datetime.now(tz=UTC) - actual_date_in_json < timedelta(minutes=5) - res = resp.json - res["dag_runs"][0]["end_date"] = current_date_placeholder - assert res == { + assert resp.json == { "dag_runs": [ { "conf": None, "conf_is_json": False, "data_interval_end": "2016-01-02T00:00:00+00:00", "data_interval_start": "2016-01-01T00:00:00+00:00", - "end_date": current_date_placeholder, + "end_date": timezone.utcnow().isoformat(), "execution_date": "2016-01-01T00:00:00+00:00", "external_trigger": False, "last_scheduling_decision": None, diff --git a/tests/www/views/test_views_tasks.py b/tests/www/views/test_views_tasks.py index ab3b40f0b2..1543db30f6 100644 --- a/tests/www/views/test_views_tasks.py +++ b/tests/www/views/test_views_tasks.py @@ -22,10 +22,9 @@ import json import re import unittest.mock import urllib.parse -from datetime import timedelta -import freezegun import pytest +import time_machine from airflow import settings from airflow.exceptions import AirflowException @@ -61,7 +60,7 @@ def reset_dagruns(): @pytest.fixture(autouse=True) def init_dagruns(app, reset_dagruns): - with freezegun.freeze_time(DEFAULT_DATE): + with time_machine.travel(DEFAULT_DATE, tick=False): app.dag_bag.get_dag("example_bash_operator").create_dagrun( run_id=DEFAULT_DAGRUN, run_type=DagRunType.SCHEDULED, @@ -563,7 +562,7 @@ def test_run_with_runnable_states(_, admin_client, session, state): "airflow.executors.executor_loader.ExecutorLoader.get_default_executor", return_value=_ForceHeartbeatCeleryExecutor(), ) -def test_run_ignoring_deps_sets_queued_dttm(_, admin_client, session): +def test_run_ignoring_deps_sets_queued_dttm(_, admin_client, session, time_machine): task_id = "runme_0" session.query(TaskInstance).filter(TaskInstance.task_id == task_id).update( {"state": State.SCHEDULED, "queued_dttm": None} @@ -579,15 +578,13 @@ def test_run_ignoring_deps_sets_queued_dttm(_, admin_client, session): dag_run_id=DEFAULT_DAGRUN, origin="/home", ) + now = timezone.utcnow() + + time_machine.move_to(now, tick=False) resp = admin_client.post("run", data=form, follow_redirects=True) assert resp.status_code == 200 - # We cannot use freezegun here as it does not play well with Flask 2.2 and SqlAlchemy - # Unlike real datetime, when FakeDatetime is used, it coerces to - # '2020-08-06 09:00:00+00:00' which is rejected by MySQL for EXPIRY Column - assert timezone.utcnow() - session.query(TaskInstance.queued_dttm).filter( - TaskInstance.task_id == task_id - ).scalar() < timedelta(minutes=5) + assert session.query(TaskInstance.queued_dttm).filter(TaskInstance.task_id == task_id).scalar() == now @pytest.mark.parametrize("state", QUEUEABLE_STATES)
