This is an automated email from the ASF dual-hosted git repository.

jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 5ab79b542b Remove deprecations in airflow.models.dag (#41774)
5ab79b542b is described below

commit 5ab79b542bee044eed8ba97b752fa9005d432d0f
Author: Jens Scheffler <[email protected]>
AuthorDate: Thu Aug 29 00:25:33 2024 +0200

    Remove deprecations in airflow.models.dag (#41774)
---
 airflow/models/dag.py                         | 171 ++------------
 airflow/models/taskinstance.py                |  10 +-
 newsfragments/41774.significant.rst           |  12 +
 tests/models/test_dag.py                      | 306 --------------------------
 tests/serialization/test_dag_serialization.py |   8 +-
 5 files changed, 37 insertions(+), 470 deletions(-)

diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 93ff43711e..96d6216c5d 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -28,7 +28,6 @@ import pickle
 import sys
 import time
 import traceback
-import warnings
 import weakref
 from collections import abc, defaultdict, deque
 from contextlib import ExitStack
@@ -90,7 +89,6 @@ from airflow.exceptions import (
     DuplicateTaskIdFound,
     FailStopDagInvalidTriggerRule,
     ParamValidationError,
-    RemovedInAirflow3Warning,
     TaskDeferred,
     TaskNotFound,
     UnknownExecutorException,
@@ -511,13 +509,11 @@ class DAG(LoggingMixin):
         schedule: ScheduleArg = None,
         start_date: datetime | None = None,
         end_date: datetime | None = None,
-        full_filepath: str | None = None,
         template_searchpath: str | Iterable[str] | None = None,
         template_undefined: type[jinja2.StrictUndefined] = 
jinja2.StrictUndefined,
         user_defined_macros: dict | None = None,
         user_defined_filters: dict | None = None,
         default_args: dict | None = None,
-        concurrency: int | None = None,
         max_active_tasks: int = airflow_conf.getint("core", 
"max_active_tasks_per_dag"),
         max_active_runs: int = airflow_conf.getint("core", 
"max_active_runs_per_dag"),
         max_consecutive_failed_dag_runs: int = airflow_conf.getint(
@@ -563,26 +559,11 @@ class DAG(LoggingMixin):
         # check self.params and convert them into ParamsDict
         self.params = ParamsDict(params)
 
-        if full_filepath:
-            warnings.warn(
-                "Passing full_filepath to DAG() is deprecated and has no 
effect",
-                RemovedInAirflow3Warning,
-                stacklevel=2,
-            )
-
         validate_key(dag_id)
 
         self._dag_id = dag_id
         self._dag_display_property_value = dag_display_name
 
-        if concurrency:
-            # TODO: Remove in Airflow 3.0
-            warnings.warn(
-                "The 'concurrency' parameter is deprecated. Please use 
'max_active_tasks'.",
-                RemovedInAirflow3Warning,
-                stacklevel=2,
-            )
-            max_active_tasks = concurrency
         self._max_active_tasks = max_active_tasks
         self._pickle_id: int | None = None
 
@@ -662,13 +643,6 @@ class DAG(LoggingMixin):
         self.sla_miss_callback = sla_miss_callback
         if default_view in DEFAULT_VIEW_PRESETS:
             self._default_view: str = default_view
-        elif default_view == "tree":
-            warnings.warn(
-                "`default_view` of 'tree' has been renamed to 'grid' -- please 
update your DAG",
-                RemovedInAirflow3Warning,
-                stacklevel=2,
-            )
-            self._default_view = "grid"
         else:
             raise AirflowException(
                 f"Invalid values of dag.default_view: only support "
@@ -819,30 +793,9 @@ class DAG(LoggingMixin):
 
     @staticmethod
     def _upgrade_outdated_dag_access_control(access_control=None):
-        """
-        Look for outdated dag level actions in DAG access_controls and replace 
them with updated actions.
-
-        For example, in DAG access_control {'role1': {'can_dag_read'}} 
'can_dag_read'
-        will be replaced with 'can_read', in {'role2': {'can_dag_read', 
'can_dag_edit'}}
-        'can_dag_edit' will be replaced with 'can_edit', etc.
-        """
+        """Look for outdated dag level actions in DAG access_controls and 
replace them with updated actions."""
         if access_control is None:
             return None
-        new_dag_perm_mapping = {
-            permissions.DEPRECATED_ACTION_CAN_DAG_READ: 
permissions.ACTION_CAN_READ,
-            permissions.DEPRECATED_ACTION_CAN_DAG_EDIT: 
permissions.ACTION_CAN_EDIT,
-        }
-
-        def update_old_perm(permission: str):
-            new_perm = new_dag_perm_mapping.get(permission, permission)
-            if new_perm != permission:
-                warnings.warn(
-                    f"The '{permission}' permission is deprecated. Please use 
'{new_perm}'.",
-                    RemovedInAirflow3Warning,
-                    stacklevel=3,
-                )
-            return new_perm
-
         updated_access_control = {}
         for role, perms in access_control.items():
             if packaging_version.parse(FAB_VERSION) >= 
packaging_version.parse("1.3.0"):
@@ -852,11 +805,6 @@ class DAG(LoggingMixin):
                     updated_access_control[role][permissions.RESOURCE_DAG] = 
set(perms)
                 else:
                     updated_access_control[role] = perms
-                if permissions.RESOURCE_DAG in updated_access_control[role]:
-                    updated_access_control[role][permissions.RESOURCE_DAG] = {
-                        update_old_perm(perm)
-                        for perm in 
updated_access_control[role][permissions.RESOURCE_DAG]
-                    }
             elif isinstance(perms, dict):
                 # Not allow new access control format with old FAB versions
                 raise AirflowException(
@@ -864,40 +812,10 @@ class DAG(LoggingMixin):
                     "use the Dag Level Access Control new format."
                 )
             else:
-                updated_access_control[role] = {update_old_perm(perm) for perm 
in perms}
+                updated_access_control[role] = set(perms)
 
         return updated_access_control
 
-    def following_schedule(self, dttm):
-        """
-        Calculate the following schedule for this dag in UTC.
-
-        :param dttm: utc datetime
-        :return: utc datetime
-        """
-        warnings.warn(
-            "`DAG.following_schedule()` is deprecated. Use 
`DAG.next_dagrun_info(restricted=False)` instead.",
-            category=RemovedInAirflow3Warning,
-            stacklevel=2,
-        )
-        data_interval = 
self.infer_automated_data_interval(timezone.coerce_datetime(dttm))
-        next_info = self.next_dagrun_info(data_interval, restricted=False)
-        if next_info is None:
-            return None
-        return next_info.data_interval.start
-
-    def previous_schedule(self, dttm):
-        from airflow.timetables.interval import _DataIntervalTimetable
-
-        warnings.warn(
-            "`DAG.previous_schedule()` is deprecated.",
-            category=RemovedInAirflow3Warning,
-            stacklevel=2,
-        )
-        if not isinstance(self.timetable, _DataIntervalTimetable):
-            return None
-        return self.timetable._get_prev(timezone.coerce_datetime(dttm))
-
     def get_next_data_interval(self, dag_model: DagModel) -> DataInterval | 
None:
         """
         Get the data interval of the next scheduled run.
@@ -979,7 +897,7 @@ class DAG(LoggingMixin):
 
     def next_dagrun_info(
         self,
-        last_automated_dagrun: None | datetime | DataInterval,
+        last_automated_dagrun: None | DataInterval,
         *,
         restricted: bool = True,
     ) -> DagRunInfo | None:
@@ -1004,16 +922,10 @@ class DAG(LoggingMixin):
         """
         data_interval = None
         if isinstance(last_automated_dagrun, datetime):
-            warnings.warn(
-                "Passing a datetime to DAG.next_dagrun_info is deprecated. Use 
a DataInterval instead.",
-                RemovedInAirflow3Warning,
-                stacklevel=2,
+            raise ValueError(
+                "Passing a datetime to DAG.next_dagrun_info is not supported 
anymore. Use a DataInterval instead."
             )
-            data_interval = self.infer_automated_data_interval(
-                timezone.coerce_datetime(last_automated_dagrun)
-            )
-        else:
-            data_interval = last_automated_dagrun
+        data_interval = last_automated_dagrun
         if restricted:
             restriction = self._time_restriction
         else:
@@ -1274,16 +1186,6 @@ class DAG(LoggingMixin):
         """Return a boolean indicating whether this DAG is paused."""
         return session.scalar(select(DagModel.is_paused).where(DagModel.dag_id 
== self.dag_id))
 
-    @property
-    def is_paused(self):
-        """Use `airflow.models.DAG.get_is_paused`, this attribute is 
deprecated."""
-        warnings.warn(
-            "This attribute is deprecated. Please use 
`airflow.models.DAG.get_is_paused` method.",
-            RemovedInAirflow3Warning,
-            stacklevel=2,
-        )
-        return self.get_is_paused()
-
     @staticmethod
     @internal_api_call
     @provide_session
@@ -2009,9 +1911,6 @@ class DAG(LoggingMixin):
         dag_run_state: DagRunState = DagRunState.QUEUED,
         dry_run: bool = False,
         session: Session = NEW_SESSION,
-        get_tis: bool = False,
-        recursion_depth: int = 0,
-        max_recursion_depth: int | None = None,
         dag_bag: DagBag | None = None,
         exclude_task_ids: frozenset[str] | frozenset[tuple[str, int]] | None = 
frozenset(),
     ) -> int | Iterable[TaskInstance]:
@@ -2032,27 +1931,6 @@ class DAG(LoggingMixin):
         :param exclude_task_ids: A set of ``task_id`` or (``task_id``, 
``map_index``)
             tuples that should not be cleared
         """
-        if get_tis:
-            warnings.warn(
-                "Passing `get_tis` to dag.clear() is deprecated. Use `dry_run` 
parameter instead.",
-                RemovedInAirflow3Warning,
-                stacklevel=2,
-            )
-            dry_run = True
-
-        if recursion_depth:
-            warnings.warn(
-                "Passing `recursion_depth` to dag.clear() is deprecated.",
-                RemovedInAirflow3Warning,
-                stacklevel=2,
-            )
-        if max_recursion_depth:
-            warnings.warn(
-                "Passing `max_recursion_depth` to dag.clear() is deprecated.",
-                RemovedInAirflow3Warning,
-                stacklevel=2,
-            )
-
         state: list[TaskInstanceState] = []
         if only_failed:
             state += [TaskInstanceState.FAILED, 
TaskInstanceState.UPSTREAM_FAILED]
@@ -2706,15 +2584,9 @@ class DAG(LoggingMixin):
             data_interval = DataInterval(*map(timezone.coerce_datetime, 
data_interval))
 
         if data_interval is None and logical_date is not None:
-            warnings.warn(
-                "Calling `DAG.create_dagrun()` without an explicit data 
interval is deprecated",
-                RemovedInAirflow3Warning,
-                stacklevel=3,
+            raise ValueError(
+                "Calling `DAG.create_dagrun()` without an explicit data 
interval is not supported."
             )
-            if run_type == DagRunType.MANUAL:
-                data_interval = 
self.timetable.infer_manual_data_interval(run_after=logical_date)
-            else:
-                data_interval = 
self.infer_automated_data_interval(logical_date)
 
         if run_type is None or isinstance(run_type, DagRunType):
             pass
@@ -3410,18 +3282,10 @@ class DagModel(Base):
         "scheduler", "max_dagruns_to_create_per_loop", fallback=10
     )
 
-    def __init__(self, concurrency=None, **kwargs):
+    def __init__(self, **kwargs):
         super().__init__(**kwargs)
         if self.max_active_tasks is None:
-            if concurrency:
-                warnings.warn(
-                    "The 'DagModel.concurrency' parameter is deprecated. 
Please use 'max_active_tasks'.",
-                    RemovedInAirflow3Warning,
-                    stacklevel=2,
-                )
-                self.max_active_tasks = concurrency
-            else:
-                self.max_active_tasks = airflow_conf.getint("core", 
"max_active_tasks_per_dag")
+            self.max_active_tasks = airflow_conf.getint("core", 
"max_active_tasks_per_dag")
 
         if self.max_active_runs is None:
             self.max_active_runs = airflow_conf.getint("core", 
"max_active_runs_per_dag")
@@ -3669,7 +3533,7 @@ class DagModel(Base):
     def calculate_dagrun_date_fields(
         self,
         dag: DAG,
-        last_automated_dag_run: None | datetime | DataInterval,
+        last_automated_dag_run: None | DataInterval,
     ) -> None:
         """
         Calculate ``next_dagrun`` and `next_dagrun_create_after``.
@@ -3680,13 +3544,10 @@ class DagModel(Base):
         """
         last_automated_data_interval: DataInterval | None
         if isinstance(last_automated_dag_run, datetime):
-            warnings.warn(
-                "Passing a datetime to `DagModel.calculate_dagrun_date_fields` 
is deprecated. "
-                "Provide a data interval instead.",
-                RemovedInAirflow3Warning,
-                stacklevel=2,
+            raise ValueError(
+                "Passing a datetime to `DagModel.calculate_dagrun_date_fields` 
is not supported. "
+                "Provide a data interval instead."
             )
-            last_automated_data_interval = 
dag.infer_automated_data_interval(last_automated_dag_run)
         else:
             last_automated_data_interval = last_automated_dag_run
         next_dagrun_info = dag.next_dagrun_info(last_automated_data_interval)
@@ -3719,13 +3580,11 @@ def dag(
     schedule: ScheduleArg = None,
     start_date: datetime | None = None,
     end_date: datetime | None = None,
-    full_filepath: str | None = None,
     template_searchpath: str | Iterable[str] | None = None,
     template_undefined: type[jinja2.StrictUndefined] = jinja2.StrictUndefined,
     user_defined_macros: dict | None = None,
     user_defined_filters: dict | None = None,
     default_args: dict | None = None,
-    concurrency: int | None = None,
     max_active_tasks: int = airflow_conf.getint("core", 
"max_active_tasks_per_dag"),
     max_active_runs: int = airflow_conf.getint("core", 
"max_active_runs_per_dag"),
     max_consecutive_failed_dag_runs: int = airflow_conf.getint(
@@ -3775,13 +3634,11 @@ def dag(
                 description=description,
                 start_date=start_date,
                 end_date=end_date,
-                full_filepath=full_filepath,
                 template_searchpath=template_searchpath,
                 template_undefined=template_undefined,
                 user_defined_macros=user_defined_macros,
                 user_defined_filters=user_defined_filters,
                 default_args=default_args,
-                concurrency=concurrency,
                 max_active_tasks=max_active_tasks,
                 max_active_runs=max_active_runs,
                 
max_consecutive_failed_dag_runs=max_consecutive_failed_dag_runs,
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 1d3909aed4..0d633f8bf3 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -1048,9 +1048,13 @@ def _get_template_context(
         # for manually triggered tasks, i.e. triggered_date == execution_date.
         if dag_run.external_trigger:
             return logical_date
-        with warnings.catch_warnings():
-            warnings.simplefilter("ignore", RemovedInAirflow3Warning)
-            return dag.previous_schedule(logical_date)
+
+        # Workaround code copy until deprecated context fields are removed in 
Airflow 3
+        from airflow.timetables.interval import _DataIntervalTimetable
+
+        if not isinstance(dag.timetable, _DataIntervalTimetable):
+            return None
+        return dag.timetable._get_prev(timezone.coerce_datetime(logical_date))
 
     @cache
     def get_prev_ds() -> str | None:
diff --git a/newsfragments/41774.significant.rst 
b/newsfragments/41774.significant.rst
new file mode 100644
index 0000000000..2d6456fb14
--- /dev/null
+++ b/newsfragments/41774.significant.rst
@@ -0,0 +1,12 @@
+Removed a set of deprecations in ``DAG`` from ``airflow.models``.
+
+- Removed deprecated parameters ``full_filepath`` and ``concurrency`` 
(Replaced by ``max_active_tasks``) from DAG and ``@dag`` decorator.
+- Removed fallback support for legacy ``default_view == "tree"``.
+- Removed legacy support for permissions named ``can_dag_read`` and 
``can_dag_edit``. The permissions need to be named ``can_read`` and 
``can_edit``.
+- Removed legacy deprecated functions ``following_schedule()`` and 
``previous_schedule``.
+- Removed deprecated support for ``datetime`` in ``next_dagrun_info()``. Use 
``DataInterval``.
+- Removed legacy DAG property ``is_paused``. Please use ``get_is_paused`` 
instead.
+- Removed legacy parameters ``get_tis``, ``recursion_depth`` and 
``max_recursion_depth`` from ``DAG.clear()``.
+- Removed implicit support to call ``create_dagrun()`` without data interval.
+- Removed support for deprecated parameter ``concurrency`` in ``DagModel``.
+- Removed support for ``datetime`` in 
``DagModel.calculate_dagrun_date_fields``. Use ``DataInterval``.
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index efb4dd8eda..804f32ae6e 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -23,7 +23,6 @@ import logging
 import os
 import pickle
 import re
-import sys
 import warnings
 import weakref
 from contextlib import redirect_stdout
@@ -39,8 +38,6 @@ import jinja2
 import pendulum
 import pytest
 import time_machine
-from dateutil.relativedelta import relativedelta
-from pendulum.tz.timezone import Timezone
 from sqlalchemy import inspect, select
 from sqlalchemy.exc import SAWarning
 
@@ -52,7 +49,6 @@ from airflow.exceptions import (
     AirflowException,
     DuplicateTaskIdFound,
     ParamValidationError,
-    RemovedInAirflow3Warning,
     UnknownExecutorException,
 )
 from airflow.executors import executor_loader
@@ -736,208 +732,6 @@ class TestDag:
 
         assert task.test_field == ["{{ ds }}", "some_string"]
 
-    def test_following_previous_schedule(self):
-        """
-        Make sure DST transitions are properly observed
-        """
-        local_tz = Timezone("Europe/Zurich")
-        start = local_tz.convert(datetime.datetime(2018, 10, 28, 2, 55, 
fold=0))
-        assert start.isoformat() == "2018-10-28T02:55:00+02:00", 
"Pre-condition: start date is in DST"
-
-        utc = timezone.convert_to_utc(start)
-        assert utc.isoformat() == "2018-10-28T00:55:00+00:00", "Pre-condition: 
correct DST->UTC conversion"
-
-        dag = DAG("tz_dag", start_date=start, schedule="*/5 * * * *")
-        with pytest.warns(
-            RemovedInAirflow3Warning,
-            match=r"`DAG.following_schedule\(\)` is deprecated. Use 
`DAG.next_dagrun_info\(restricted=False\)` instead.",
-        ):
-            _next = dag.following_schedule(utc)
-        next_local = local_tz.convert(_next)
-
-        assert _next.isoformat() == "2018-10-28T01:00:00+00:00"
-        assert next_local.isoformat() == "2018-10-28T02:00:00+01:00"
-
-        with pytest.warns(RemovedInAirflow3Warning, 
match=r"`DAG.previous_schedule\(\)` is deprecated."):
-            prev = dag.previous_schedule(utc)
-        prev_local = local_tz.convert(prev)
-
-        assert prev_local.isoformat() == "2018-10-28T02:50:00+02:00"
-
-        with pytest.warns(RemovedInAirflow3Warning, 
match=r"`DAG.previous_schedule\(\)` is deprecated."):
-            prev = dag.previous_schedule(_next)
-        prev_local = local_tz.convert(prev)
-
-        assert prev_local.isoformat() == "2018-10-28T02:55:00+02:00"
-        assert prev == utc
-
-    def test_following_previous_schedule_daily_dag_cest_to_cet(self):
-        """
-        Make sure DST transitions are properly observed
-        """
-        local_tz = pendulum.timezone("Europe/Zurich")
-        start = local_tz.convert(datetime.datetime(2018, 10, 27, 3, fold=0))
-
-        utc = timezone.convert_to_utc(start)
-
-        dag = DAG("tz_dag", start_date=start, schedule="0 3 * * *")
-
-        with pytest.warns(RemovedInAirflow3Warning, 
match=r"`DAG.previous_schedule\(\)` is deprecated."):
-            prev = dag.previous_schedule(utc)
-        prev_local = local_tz.convert(prev)
-
-        assert prev_local.isoformat() == "2018-10-26T03:00:00+02:00"
-        assert prev.isoformat() == "2018-10-26T01:00:00+00:00"
-
-        with pytest.warns(
-            RemovedInAirflow3Warning,
-            match=r"`DAG.following_schedule\(\)` is deprecated. Use 
`DAG.next_dagrun_info\(restricted=False\)` instead.",
-        ):
-            _next = dag.following_schedule(utc)
-        next_local = local_tz.convert(_next)
-
-        assert next_local.isoformat() == "2018-10-28T03:00:00+01:00"
-        assert _next.isoformat() == "2018-10-28T02:00:00+00:00"
-
-        with pytest.warns(RemovedInAirflow3Warning, 
match=r"`DAG.previous_schedule\(\)` is deprecated."):
-            prev = dag.previous_schedule(_next)
-        prev_local = local_tz.convert(prev)
-
-        assert prev_local.isoformat() == "2018-10-27T03:00:00+02:00"
-        assert prev.isoformat() == "2018-10-27T01:00:00+00:00"
-
-    def test_following_previous_schedule_daily_dag_cet_to_cest(self):
-        """
-        Make sure DST transitions are properly observed
-        """
-        local_tz = pendulum.timezone("Europe/Zurich")
-        start = local_tz.convert(datetime.datetime(2018, 3, 25, 2, fold=0))
-
-        utc = timezone.convert_to_utc(start)
-
-        dag = DAG("tz_dag", start_date=start, schedule="0 3 * * *")
-
-        with pytest.warns(RemovedInAirflow3Warning, 
match=r"`DAG.previous_schedule\(\)` is deprecated."):
-            prev = dag.previous_schedule(utc)
-        prev_local = local_tz.convert(prev)
-
-        assert prev_local.isoformat() == "2018-03-24T03:00:00+01:00"
-        assert prev.isoformat() == "2018-03-24T02:00:00+00:00"
-
-        with pytest.warns(
-            RemovedInAirflow3Warning,
-            match=r"`DAG.following_schedule\(\)` is deprecated. Use 
`DAG.next_dagrun_info\(restricted=False\)` instead.",
-        ):
-            _next = dag.following_schedule(utc)
-        next_local = local_tz.convert(_next)
-
-        assert next_local.isoformat() == "2018-03-25T03:00:00+02:00"
-        assert _next.isoformat() == "2018-03-25T01:00:00+00:00"
-
-        with pytest.warns(RemovedInAirflow3Warning, 
match=r"`DAG.previous_schedule\(\)` is deprecated."):
-            prev = dag.previous_schedule(_next)
-        prev_local = local_tz.convert(prev)
-
-        assert prev_local.isoformat() == "2018-03-24T03:00:00+01:00"
-        assert prev.isoformat() == "2018-03-24T02:00:00+00:00"
-
-    def test_following_schedule_relativedelta(self):
-        """
-        Tests following_schedule a dag with a relativedelta schedule
-        """
-        dag_id = "test_schedule_dag_relativedelta"
-        delta = relativedelta(hours=+1)
-        dag = DAG(dag_id=dag_id, schedule=delta, start_date=TEST_DATE)
-        dag.add_task(BaseOperator(task_id="faketastic", owner="Also fake", 
start_date=TEST_DATE))
-
-        with pytest.warns(
-            RemovedInAirflow3Warning,
-            match=r"`DAG.following_schedule\(\)` is deprecated. Use 
`DAG.next_dagrun_info\(restricted=False\)` instead.",
-        ):
-            _next = dag.following_schedule(TEST_DATE)
-        assert _next.isoformat() == "2015-01-02T01:00:00+00:00"
-
-        with pytest.warns(
-            RemovedInAirflow3Warning,
-            match=r"`DAG.following_schedule\(\)` is deprecated. Use 
`DAG.next_dagrun_info\(restricted=False\)` instead.",
-        ):
-            _next = dag.following_schedule(_next)
-        assert _next.isoformat() == "2015-01-02T02:00:00+00:00"
-
-    def test_following_schedule_relativedelta_with_deprecated_schedule(self):
-        """
-        Tests following_schedule a dag with a relativedelta schedule
-        """
-        dag_id = "test_schedule_dag_relativedelta"
-        delta = relativedelta(hours=+1)
-        dag = DAG(dag_id=dag_id, schedule=delta, start_date=TEST_DATE)
-        dag.add_task(BaseOperator(task_id="faketastic", owner="Also fake", 
start_date=TEST_DATE))
-
-        with pytest.warns(
-            RemovedInAirflow3Warning,
-            match=r"`DAG.following_schedule\(\)` is deprecated. Use 
`DAG.next_dagrun_info\(restricted=False\)` instead.",
-        ):
-            _next = dag.following_schedule(TEST_DATE)
-        assert _next.isoformat() == "2015-01-02T01:00:00+00:00"
-
-        with pytest.warns(
-            RemovedInAirflow3Warning,
-            match=r"`DAG.following_schedule\(\)` is deprecated. Use 
`DAG.next_dagrun_info\(restricted=False\)` instead.",
-        ):
-            _next = dag.following_schedule(_next)
-        assert _next.isoformat() == "2015-01-02T02:00:00+00:00"
-
-    def 
test_following_schedule_relativedelta_with_depr_schedule_decorated_dag(self):
-        """
-        Tests following_schedule a dag with a relativedelta schedule
-        using decorated dag
-        """
-        from airflow.decorators import dag
-
-        dag_id = "test_schedule_dag_relativedelta"
-        delta = relativedelta(hours=+1)
-
-        @dag(dag_id=dag_id, schedule=delta, start_date=TEST_DATE)
-        def mydag():
-            BaseOperator(task_id="faketastic", owner="Also fake", 
start_date=TEST_DATE)
-
-        _dag = mydag()
-
-        with pytest.warns(
-            RemovedInAirflow3Warning,
-            match=r"`DAG.following_schedule\(\)` is deprecated. Use 
`DAG.next_dagrun_info\(restricted=False\)` instead.",
-        ):
-            _next = _dag.following_schedule(TEST_DATE)
-        assert _next.isoformat() == "2015-01-02T01:00:00+00:00"
-
-        with pytest.warns(
-            RemovedInAirflow3Warning,
-            match=r"`DAG.following_schedule\(\)` is deprecated. Use 
`DAG.next_dagrun_info\(restricted=False\)` instead.",
-        ):
-            _next = _dag.following_schedule(_next)
-        assert _next.isoformat() == "2015-01-02T02:00:00+00:00"
-
-    def test_previous_schedule_datetime_timezone(self):
-        # Check that we don't get an AttributeError 'name' for self.timezone
-
-        start = datetime.datetime(2018, 3, 25, 2, tzinfo=datetime.timezone.utc)
-        dag = DAG("tz_dag", start_date=start, schedule="@hourly")
-        with pytest.warns(RemovedInAirflow3Warning, 
match=r"`DAG.previous_schedule\(\)` is deprecated."):
-            when = dag.previous_schedule(start)
-        assert when.isoformat() == "2018-03-25T01:00:00+00:00"
-
-    def test_following_schedule_datetime_timezone(self):
-        # Check that we don't get an AttributeError 'name' for self.timezone
-
-        start = datetime.datetime(2018, 3, 25, 2, tzinfo=datetime.timezone.utc)
-        dag = DAG("tz_dag", start_date=start, schedule="@hourly")
-        with pytest.warns(
-            RemovedInAirflow3Warning,
-            match=r"`DAG.following_schedule\(\)` is deprecated. Use 
`DAG.next_dagrun_info\(restricted=False\)` instead.",
-        ):
-            when = dag.following_schedule(start)
-        assert when.isoformat() == "2018-03-25T03:00:00+00:00"
-
     def test_create_dagrun_when_schedule_is_none_and_empty_start_date(self):
         # Check that we don't get an AttributeError 'start_date' for 
self.start_date when schedule is none
         dag = DAG("dag_with_none_schedule_and_empty_start_date", schedule=None)
@@ -955,34 +749,6 @@ class TestDag:
         with pytest.raises(ValueError, match="start_date is required when 
catchup=True"):
             DAG(dag_id="dag_with_non_none_schedule_and_empty_start_date", 
schedule="@hourly", catchup=True)
 
-    def test_following_schedule_datetime_timezone_utc0530(self):
-        # Check that we don't get an AttributeError 'name' for self.timezone
-        class UTC0530(datetime.tzinfo):
-            """tzinfo derived concrete class named "+0530" with offset of 
19800"""
-
-            # can be configured here
-            _offset = datetime.timedelta(seconds=19800)
-            _dst = datetime.timedelta(0)
-            _name = "+0530"
-
-            def utcoffset(self, dt):
-                return self.__class__._offset
-
-            def dst(self, dt):
-                return self.__class__._dst
-
-            def tzname(self, dt):
-                return self.__class__._name
-
-        start = datetime.datetime(2018, 3, 25, 10, tzinfo=UTC0530())
-        dag = DAG("tz_dag", start_date=start, schedule="@hourly")
-        with pytest.warns(
-            RemovedInAirflow3Warning,
-            match=r"`DAG.following_schedule\(\)` is deprecated. Use 
`DAG.next_dagrun_info\(restricted=False\)` instead.",
-        ):
-            when = dag.following_schedule(start)
-        assert when.isoformat() == "2018-03-25T05:30:00+00:00"
-
     def test_dagtag_repr(self):
         clear_db_dags()
         dag = DAG("dag-test-dagtag", schedule=None, start_date=DEFAULT_DATE, 
tags=["tag-1", "tag-2"])
@@ -2260,38 +2026,6 @@ my_postgres_conn:
         next_info = dag.next_dagrun_info(next_info.data_interval)
         assert next_info is None
 
-    def test_next_dagrun_info_start_end_dates(self):
-        """
-        Tests that an attempt to schedule a task after the Dag's end_date
-        does not succeed.
-        """
-        delta = datetime.timedelta(hours=1)
-        runs = 3
-        start_date = DEFAULT_DATE
-        end_date = start_date + (runs - 1) * delta
-        dag_id = "test_schedule_dag_start_end_dates"
-        dag = DAG(dag_id=dag_id, start_date=start_date, end_date=end_date, 
schedule=delta)
-        dag.add_task(BaseOperator(task_id="faketastic", owner="Also fake"))
-
-        # Create and schedule the dag runs
-        dates = []
-        interval = None
-        for _ in range(runs):
-            next_info = dag.next_dagrun_info(interval)
-            if next_info is None:
-                dates.append(None)
-            else:
-                interval = next_info.data_interval
-                dates.append(interval.start)
-
-        assert all(date is not None for date in dates)
-        assert dates[-1] == end_date
-        with pytest.warns(
-            RemovedInAirflow3Warning,
-            match="Passing a datetime to DAG.next_dagrun_info is deprecated. 
Use a DataInterval instead.",
-        ):
-            assert dag.next_dagrun_info(interval.start) is None
-
     def test_next_dagrun_info_catchup(self):
         """
         Test to check that a DAG with catchup = False only schedules beginning 
now, not back to the start date
@@ -2489,32 +2223,6 @@ my_postgres_conn:
         assert next_info.data_interval.start == timezone.datetime(2028, 2, 29)
         assert next_info.data_interval.end == timezone.datetime(2032, 2, 29)
 
-    def test_replace_outdated_access_control_actions(self):
-        outdated_permissions = {
-            "role1": {permissions.ACTION_CAN_READ, 
permissions.ACTION_CAN_EDIT},
-            "role2": {permissions.DEPRECATED_ACTION_CAN_DAG_READ, 
permissions.DEPRECATED_ACTION_CAN_DAG_EDIT},
-            "role3": {permissions.RESOURCE_DAG_RUN: 
{permissions.ACTION_CAN_CREATE}},
-        }
-        updated_permissions = {
-            "role1": {permissions.RESOURCE_DAG: {permissions.ACTION_CAN_READ, 
permissions.ACTION_CAN_EDIT}},
-            "role2": {permissions.RESOURCE_DAG: {permissions.ACTION_CAN_READ, 
permissions.ACTION_CAN_EDIT}},
-            "role3": {permissions.RESOURCE_DAG_RUN: 
{permissions.ACTION_CAN_CREATE}},
-        }
-
-        with pytest.warns(DeprecationWarning) as deprecation_warnings:
-            dag = DAG(dag_id="dag_with_outdated_perms", schedule=None, 
access_control=outdated_permissions)
-        assert dag.access_control == updated_permissions
-        assert len(deprecation_warnings) == 2
-        assert "permission is deprecated" in 
str(deprecation_warnings[0].message)
-        assert "permission is deprecated" in 
str(deprecation_warnings[1].message)
-
-        with pytest.warns(DeprecationWarning) as deprecation_warnings:
-            dag.access_control = outdated_permissions
-        assert dag.access_control == updated_permissions
-        assert len(deprecation_warnings) == 2
-        assert "permission is deprecated" in 
str(deprecation_warnings[0].message)
-        assert "permission is deprecated" in 
str(deprecation_warnings[1].message)
-
     @pytest.mark.parametrize(
         "fab_version, perms, expected_exception, expected_perms",
         [
@@ -3213,20 +2921,6 @@ class TestDagDecorator:
         dag = xcom_pass_to_op()
         assert dag.params["value"] == value
 
-    @pytest.mark.xfail(strict=True, reason="Airflow 3.0 should not have 
warnings here anymore")
-    def test_warning_location(self):
-        # NOTE: This only works as long as there is some warning we can emit 
from `DAG()`
-        @dag_decorator(schedule=None)
-        def mydag(): ...
-
-        with pytest.warns(RemovedInAirflow3Warning) as warnings:
-            line = sys._getframe().f_lineno + 1
-            mydag()
-
-        w = warnings.pop(RemovedInAirflow3Warning)
-        assert w.filename == __file__
-        assert w.lineno == line
-
 
 @pytest.mark.parametrize(
     "run_id, execution_date",
diff --git a/tests/serialization/test_dag_serialization.py 
b/tests/serialization/test_dag_serialization.py
index f7aae9ce8a..8311aa77e7 100644
--- a/tests/serialization/test_dag_serialization.py
+++ b/tests/serialization/test_dag_serialization.py
@@ -317,8 +317,8 @@ def make_user_defined_macro_filter_dag():
         (2) templates with function macros have been rendered before 
serialization.
     """
 
-    def compute_next_execution_date(dag, execution_date):
-        return dag.following_schedule(execution_date)
+    def compute_last_dagrun(dag: DAG):
+        return dag.get_last_dagrun(include_externally_triggered=True)
 
     default_args = {"start_date": datetime(2019, 7, 10)}
     dag = DAG(
@@ -326,14 +326,14 @@ def make_user_defined_macro_filter_dag():
         schedule=None,
         default_args=default_args,
         user_defined_macros={
-            "next_execution_date": compute_next_execution_date,
+            "last_dagrun": compute_last_dagrun,
         },
         user_defined_filters={"hello": lambda name: f"Hello {name}"},
         catchup=False,
     )
     BashOperator(
         task_id="echo",
-        bash_command='echo "{{ next_execution_date(dag, execution_date) }}"',
+        bash_command='echo "{{ last_dagrun(dag) }}"',
         dag=dag,
     )
     return {dag.dag_id: dag}


Reply via email to