This is an automated email from the ASF dual-hosted git repository.
jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 5ab79b542b Remove deprecations in airflow.models.dag (#41774)
5ab79b542b is described below
commit 5ab79b542bee044eed8ba97b752fa9005d432d0f
Author: Jens Scheffler <[email protected]>
AuthorDate: Thu Aug 29 00:25:33 2024 +0200
Remove deprecations in airflow.models.dag (#41774)
---
airflow/models/dag.py | 171 ++------------
airflow/models/taskinstance.py | 10 +-
newsfragments/41774.significant.rst | 12 +
tests/models/test_dag.py | 306 --------------------------
tests/serialization/test_dag_serialization.py | 8 +-
5 files changed, 37 insertions(+), 470 deletions(-)
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 93ff43711e..96d6216c5d 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -28,7 +28,6 @@ import pickle
import sys
import time
import traceback
-import warnings
import weakref
from collections import abc, defaultdict, deque
from contextlib import ExitStack
@@ -90,7 +89,6 @@ from airflow.exceptions import (
DuplicateTaskIdFound,
FailStopDagInvalidTriggerRule,
ParamValidationError,
- RemovedInAirflow3Warning,
TaskDeferred,
TaskNotFound,
UnknownExecutorException,
@@ -511,13 +509,11 @@ class DAG(LoggingMixin):
schedule: ScheduleArg = None,
start_date: datetime | None = None,
end_date: datetime | None = None,
- full_filepath: str | None = None,
template_searchpath: str | Iterable[str] | None = None,
template_undefined: type[jinja2.StrictUndefined] =
jinja2.StrictUndefined,
user_defined_macros: dict | None = None,
user_defined_filters: dict | None = None,
default_args: dict | None = None,
- concurrency: int | None = None,
max_active_tasks: int = airflow_conf.getint("core",
"max_active_tasks_per_dag"),
max_active_runs: int = airflow_conf.getint("core",
"max_active_runs_per_dag"),
max_consecutive_failed_dag_runs: int = airflow_conf.getint(
@@ -563,26 +559,11 @@ class DAG(LoggingMixin):
# check self.params and convert them into ParamsDict
self.params = ParamsDict(params)
- if full_filepath:
- warnings.warn(
- "Passing full_filepath to DAG() is deprecated and has no
effect",
- RemovedInAirflow3Warning,
- stacklevel=2,
- )
-
validate_key(dag_id)
self._dag_id = dag_id
self._dag_display_property_value = dag_display_name
- if concurrency:
- # TODO: Remove in Airflow 3.0
- warnings.warn(
- "The 'concurrency' parameter is deprecated. Please use
'max_active_tasks'.",
- RemovedInAirflow3Warning,
- stacklevel=2,
- )
- max_active_tasks = concurrency
self._max_active_tasks = max_active_tasks
self._pickle_id: int | None = None
@@ -662,13 +643,6 @@ class DAG(LoggingMixin):
self.sla_miss_callback = sla_miss_callback
if default_view in DEFAULT_VIEW_PRESETS:
self._default_view: str = default_view
- elif default_view == "tree":
- warnings.warn(
- "`default_view` of 'tree' has been renamed to 'grid' -- please
update your DAG",
- RemovedInAirflow3Warning,
- stacklevel=2,
- )
- self._default_view = "grid"
else:
raise AirflowException(
f"Invalid values of dag.default_view: only support "
@@ -819,30 +793,9 @@ class DAG(LoggingMixin):
@staticmethod
def _upgrade_outdated_dag_access_control(access_control=None):
- """
- Look for outdated dag level actions in DAG access_controls and replace
them with updated actions.
-
- For example, in DAG access_control {'role1': {'can_dag_read'}}
'can_dag_read'
- will be replaced with 'can_read', in {'role2': {'can_dag_read',
'can_dag_edit'}}
- 'can_dag_edit' will be replaced with 'can_edit', etc.
- """
+ """Look for outdated dag level actions in DAG access_controls and
replace them with updated actions."""
if access_control is None:
return None
- new_dag_perm_mapping = {
- permissions.DEPRECATED_ACTION_CAN_DAG_READ:
permissions.ACTION_CAN_READ,
- permissions.DEPRECATED_ACTION_CAN_DAG_EDIT:
permissions.ACTION_CAN_EDIT,
- }
-
- def update_old_perm(permission: str):
- new_perm = new_dag_perm_mapping.get(permission, permission)
- if new_perm != permission:
- warnings.warn(
- f"The '{permission}' permission is deprecated. Please use
'{new_perm}'.",
- RemovedInAirflow3Warning,
- stacklevel=3,
- )
- return new_perm
-
updated_access_control = {}
for role, perms in access_control.items():
if packaging_version.parse(FAB_VERSION) >=
packaging_version.parse("1.3.0"):
@@ -852,11 +805,6 @@ class DAG(LoggingMixin):
updated_access_control[role][permissions.RESOURCE_DAG] =
set(perms)
else:
updated_access_control[role] = perms
- if permissions.RESOURCE_DAG in updated_access_control[role]:
- updated_access_control[role][permissions.RESOURCE_DAG] = {
- update_old_perm(perm)
- for perm in
updated_access_control[role][permissions.RESOURCE_DAG]
- }
elif isinstance(perms, dict):
# Not allow new access control format with old FAB versions
raise AirflowException(
@@ -864,40 +812,10 @@ class DAG(LoggingMixin):
"use the Dag Level Access Control new format."
)
else:
- updated_access_control[role] = {update_old_perm(perm) for perm
in perms}
+ updated_access_control[role] = set(perms)
return updated_access_control
- def following_schedule(self, dttm):
- """
- Calculate the following schedule for this dag in UTC.
-
- :param dttm: utc datetime
- :return: utc datetime
- """
- warnings.warn(
- "`DAG.following_schedule()` is deprecated. Use
`DAG.next_dagrun_info(restricted=False)` instead.",
- category=RemovedInAirflow3Warning,
- stacklevel=2,
- )
- data_interval =
self.infer_automated_data_interval(timezone.coerce_datetime(dttm))
- next_info = self.next_dagrun_info(data_interval, restricted=False)
- if next_info is None:
- return None
- return next_info.data_interval.start
-
- def previous_schedule(self, dttm):
- from airflow.timetables.interval import _DataIntervalTimetable
-
- warnings.warn(
- "`DAG.previous_schedule()` is deprecated.",
- category=RemovedInAirflow3Warning,
- stacklevel=2,
- )
- if not isinstance(self.timetable, _DataIntervalTimetable):
- return None
- return self.timetable._get_prev(timezone.coerce_datetime(dttm))
-
def get_next_data_interval(self, dag_model: DagModel) -> DataInterval |
None:
"""
Get the data interval of the next scheduled run.
@@ -979,7 +897,7 @@ class DAG(LoggingMixin):
def next_dagrun_info(
self,
- last_automated_dagrun: None | datetime | DataInterval,
+ last_automated_dagrun: None | DataInterval,
*,
restricted: bool = True,
) -> DagRunInfo | None:
@@ -1004,16 +922,10 @@ class DAG(LoggingMixin):
"""
data_interval = None
if isinstance(last_automated_dagrun, datetime):
- warnings.warn(
- "Passing a datetime to DAG.next_dagrun_info is deprecated. Use
a DataInterval instead.",
- RemovedInAirflow3Warning,
- stacklevel=2,
+ raise ValueError(
+ "Passing a datetime to DAG.next_dagrun_info is not supported
anymore. Use a DataInterval instead."
)
- data_interval = self.infer_automated_data_interval(
- timezone.coerce_datetime(last_automated_dagrun)
- )
- else:
- data_interval = last_automated_dagrun
+ data_interval = last_automated_dagrun
if restricted:
restriction = self._time_restriction
else:
@@ -1274,16 +1186,6 @@ class DAG(LoggingMixin):
"""Return a boolean indicating whether this DAG is paused."""
return session.scalar(select(DagModel.is_paused).where(DagModel.dag_id
== self.dag_id))
- @property
- def is_paused(self):
- """Use `airflow.models.DAG.get_is_paused`, this attribute is
deprecated."""
- warnings.warn(
- "This attribute is deprecated. Please use
`airflow.models.DAG.get_is_paused` method.",
- RemovedInAirflow3Warning,
- stacklevel=2,
- )
- return self.get_is_paused()
-
@staticmethod
@internal_api_call
@provide_session
@@ -2009,9 +1911,6 @@ class DAG(LoggingMixin):
dag_run_state: DagRunState = DagRunState.QUEUED,
dry_run: bool = False,
session: Session = NEW_SESSION,
- get_tis: bool = False,
- recursion_depth: int = 0,
- max_recursion_depth: int | None = None,
dag_bag: DagBag | None = None,
exclude_task_ids: frozenset[str] | frozenset[tuple[str, int]] | None =
frozenset(),
) -> int | Iterable[TaskInstance]:
@@ -2032,27 +1931,6 @@ class DAG(LoggingMixin):
:param exclude_task_ids: A set of ``task_id`` or (``task_id``,
``map_index``)
tuples that should not be cleared
"""
- if get_tis:
- warnings.warn(
- "Passing `get_tis` to dag.clear() is deprecated. Use `dry_run`
parameter instead.",
- RemovedInAirflow3Warning,
- stacklevel=2,
- )
- dry_run = True
-
- if recursion_depth:
- warnings.warn(
- "Passing `recursion_depth` to dag.clear() is deprecated.",
- RemovedInAirflow3Warning,
- stacklevel=2,
- )
- if max_recursion_depth:
- warnings.warn(
- "Passing `max_recursion_depth` to dag.clear() is deprecated.",
- RemovedInAirflow3Warning,
- stacklevel=2,
- )
-
state: list[TaskInstanceState] = []
if only_failed:
state += [TaskInstanceState.FAILED,
TaskInstanceState.UPSTREAM_FAILED]
@@ -2706,15 +2584,9 @@ class DAG(LoggingMixin):
data_interval = DataInterval(*map(timezone.coerce_datetime,
data_interval))
if data_interval is None and logical_date is not None:
- warnings.warn(
- "Calling `DAG.create_dagrun()` without an explicit data
interval is deprecated",
- RemovedInAirflow3Warning,
- stacklevel=3,
+ raise ValueError(
+ "Calling `DAG.create_dagrun()` without an explicit data
interval is not supported."
)
- if run_type == DagRunType.MANUAL:
- data_interval =
self.timetable.infer_manual_data_interval(run_after=logical_date)
- else:
- data_interval =
self.infer_automated_data_interval(logical_date)
if run_type is None or isinstance(run_type, DagRunType):
pass
@@ -3410,18 +3282,10 @@ class DagModel(Base):
"scheduler", "max_dagruns_to_create_per_loop", fallback=10
)
- def __init__(self, concurrency=None, **kwargs):
+ def __init__(self, **kwargs):
super().__init__(**kwargs)
if self.max_active_tasks is None:
- if concurrency:
- warnings.warn(
- "The 'DagModel.concurrency' parameter is deprecated.
Please use 'max_active_tasks'.",
- RemovedInAirflow3Warning,
- stacklevel=2,
- )
- self.max_active_tasks = concurrency
- else:
- self.max_active_tasks = airflow_conf.getint("core",
"max_active_tasks_per_dag")
+ self.max_active_tasks = airflow_conf.getint("core",
"max_active_tasks_per_dag")
if self.max_active_runs is None:
self.max_active_runs = airflow_conf.getint("core",
"max_active_runs_per_dag")
@@ -3669,7 +3533,7 @@ class DagModel(Base):
def calculate_dagrun_date_fields(
self,
dag: DAG,
- last_automated_dag_run: None | datetime | DataInterval,
+ last_automated_dag_run: None | DataInterval,
) -> None:
"""
Calculate ``next_dagrun`` and `next_dagrun_create_after``.
@@ -3680,13 +3544,10 @@ class DagModel(Base):
"""
last_automated_data_interval: DataInterval | None
if isinstance(last_automated_dag_run, datetime):
- warnings.warn(
- "Passing a datetime to `DagModel.calculate_dagrun_date_fields`
is deprecated. "
- "Provide a data interval instead.",
- RemovedInAirflow3Warning,
- stacklevel=2,
+ raise ValueError(
+ "Passing a datetime to `DagModel.calculate_dagrun_date_fields`
is not supported. "
+ "Provide a data interval instead."
)
- last_automated_data_interval =
dag.infer_automated_data_interval(last_automated_dag_run)
else:
last_automated_data_interval = last_automated_dag_run
next_dagrun_info = dag.next_dagrun_info(last_automated_data_interval)
@@ -3719,13 +3580,11 @@ def dag(
schedule: ScheduleArg = None,
start_date: datetime | None = None,
end_date: datetime | None = None,
- full_filepath: str | None = None,
template_searchpath: str | Iterable[str] | None = None,
template_undefined: type[jinja2.StrictUndefined] = jinja2.StrictUndefined,
user_defined_macros: dict | None = None,
user_defined_filters: dict | None = None,
default_args: dict | None = None,
- concurrency: int | None = None,
max_active_tasks: int = airflow_conf.getint("core",
"max_active_tasks_per_dag"),
max_active_runs: int = airflow_conf.getint("core",
"max_active_runs_per_dag"),
max_consecutive_failed_dag_runs: int = airflow_conf.getint(
@@ -3775,13 +3634,11 @@ def dag(
description=description,
start_date=start_date,
end_date=end_date,
- full_filepath=full_filepath,
template_searchpath=template_searchpath,
template_undefined=template_undefined,
user_defined_macros=user_defined_macros,
user_defined_filters=user_defined_filters,
default_args=default_args,
- concurrency=concurrency,
max_active_tasks=max_active_tasks,
max_active_runs=max_active_runs,
max_consecutive_failed_dag_runs=max_consecutive_failed_dag_runs,
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 1d3909aed4..0d633f8bf3 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -1048,9 +1048,13 @@ def _get_template_context(
# for manually triggered tasks, i.e. triggered_date == execution_date.
if dag_run.external_trigger:
return logical_date
- with warnings.catch_warnings():
- warnings.simplefilter("ignore", RemovedInAirflow3Warning)
- return dag.previous_schedule(logical_date)
+
+ # Workaround code copy until deprecated context fields are removed in
Airflow 3
+ from airflow.timetables.interval import _DataIntervalTimetable
+
+ if not isinstance(dag.timetable, _DataIntervalTimetable):
+ return None
+ return dag.timetable._get_prev(timezone.coerce_datetime(logical_date))
@cache
def get_prev_ds() -> str | None:
diff --git a/newsfragments/41774.significant.rst
b/newsfragments/41774.significant.rst
new file mode 100644
index 0000000000..2d6456fb14
--- /dev/null
+++ b/newsfragments/41774.significant.rst
@@ -0,0 +1,12 @@
+Removed a set of deprecations in ``DAG`` from ``airflow.models``.
+
+- Removed deprecated parameters ``full_filepath`` and ``concurrency``
(Replaced by ``max_active_tasks``) from DAG and ``@dag`` decorator.
+- Removed fallback support for legacy ``default_view == "tree"``.
+- Removed legacy support for permissions named ``can_dag_read`` and
``can_dag_edit``. The permissions need to be named ``can_read`` and
``can_edit``.
+- Removed legacy deprecated functions ``following_schedule()`` and
``previous_schedule``.
+- Removed deprecated support for ``datetime`` in ``next_dagrun_info()``. Use
``DataInterval``.
+- Removed legacy DAG property ``is_paused``. Please use ``get_is_paused``
instead.
+- Removed legacy parameters ``get_tis``, ``recursion_depth`` and
``max_recursion_depth`` from ``DAG.clear()``.
+- Removed implicit support to call ``create_dagrun()`` without data interval.
+- Removed support for deprecated parameter ``concurrency`` in ``DagModel``.
+- Removed support for ``datetime`` in
``DagModel.calculate_dagrun_date_fields``. Use ``DataInterval``.
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index efb4dd8eda..804f32ae6e 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -23,7 +23,6 @@ import logging
import os
import pickle
import re
-import sys
import warnings
import weakref
from contextlib import redirect_stdout
@@ -39,8 +38,6 @@ import jinja2
import pendulum
import pytest
import time_machine
-from dateutil.relativedelta import relativedelta
-from pendulum.tz.timezone import Timezone
from sqlalchemy import inspect, select
from sqlalchemy.exc import SAWarning
@@ -52,7 +49,6 @@ from airflow.exceptions import (
AirflowException,
DuplicateTaskIdFound,
ParamValidationError,
- RemovedInAirflow3Warning,
UnknownExecutorException,
)
from airflow.executors import executor_loader
@@ -736,208 +732,6 @@ class TestDag:
assert task.test_field == ["{{ ds }}", "some_string"]
- def test_following_previous_schedule(self):
- """
- Make sure DST transitions are properly observed
- """
- local_tz = Timezone("Europe/Zurich")
- start = local_tz.convert(datetime.datetime(2018, 10, 28, 2, 55,
fold=0))
- assert start.isoformat() == "2018-10-28T02:55:00+02:00",
"Pre-condition: start date is in DST"
-
- utc = timezone.convert_to_utc(start)
- assert utc.isoformat() == "2018-10-28T00:55:00+00:00", "Pre-condition:
correct DST->UTC conversion"
-
- dag = DAG("tz_dag", start_date=start, schedule="*/5 * * * *")
- with pytest.warns(
- RemovedInAirflow3Warning,
- match=r"`DAG.following_schedule\(\)` is deprecated. Use
`DAG.next_dagrun_info\(restricted=False\)` instead.",
- ):
- _next = dag.following_schedule(utc)
- next_local = local_tz.convert(_next)
-
- assert _next.isoformat() == "2018-10-28T01:00:00+00:00"
- assert next_local.isoformat() == "2018-10-28T02:00:00+01:00"
-
- with pytest.warns(RemovedInAirflow3Warning,
match=r"`DAG.previous_schedule\(\)` is deprecated."):
- prev = dag.previous_schedule(utc)
- prev_local = local_tz.convert(prev)
-
- assert prev_local.isoformat() == "2018-10-28T02:50:00+02:00"
-
- with pytest.warns(RemovedInAirflow3Warning,
match=r"`DAG.previous_schedule\(\)` is deprecated."):
- prev = dag.previous_schedule(_next)
- prev_local = local_tz.convert(prev)
-
- assert prev_local.isoformat() == "2018-10-28T02:55:00+02:00"
- assert prev == utc
-
- def test_following_previous_schedule_daily_dag_cest_to_cet(self):
- """
- Make sure DST transitions are properly observed
- """
- local_tz = pendulum.timezone("Europe/Zurich")
- start = local_tz.convert(datetime.datetime(2018, 10, 27, 3, fold=0))
-
- utc = timezone.convert_to_utc(start)
-
- dag = DAG("tz_dag", start_date=start, schedule="0 3 * * *")
-
- with pytest.warns(RemovedInAirflow3Warning,
match=r"`DAG.previous_schedule\(\)` is deprecated."):
- prev = dag.previous_schedule(utc)
- prev_local = local_tz.convert(prev)
-
- assert prev_local.isoformat() == "2018-10-26T03:00:00+02:00"
- assert prev.isoformat() == "2018-10-26T01:00:00+00:00"
-
- with pytest.warns(
- RemovedInAirflow3Warning,
- match=r"`DAG.following_schedule\(\)` is deprecated. Use
`DAG.next_dagrun_info\(restricted=False\)` instead.",
- ):
- _next = dag.following_schedule(utc)
- next_local = local_tz.convert(_next)
-
- assert next_local.isoformat() == "2018-10-28T03:00:00+01:00"
- assert _next.isoformat() == "2018-10-28T02:00:00+00:00"
-
- with pytest.warns(RemovedInAirflow3Warning,
match=r"`DAG.previous_schedule\(\)` is deprecated."):
- prev = dag.previous_schedule(_next)
- prev_local = local_tz.convert(prev)
-
- assert prev_local.isoformat() == "2018-10-27T03:00:00+02:00"
- assert prev.isoformat() == "2018-10-27T01:00:00+00:00"
-
- def test_following_previous_schedule_daily_dag_cet_to_cest(self):
- """
- Make sure DST transitions are properly observed
- """
- local_tz = pendulum.timezone("Europe/Zurich")
- start = local_tz.convert(datetime.datetime(2018, 3, 25, 2, fold=0))
-
- utc = timezone.convert_to_utc(start)
-
- dag = DAG("tz_dag", start_date=start, schedule="0 3 * * *")
-
- with pytest.warns(RemovedInAirflow3Warning,
match=r"`DAG.previous_schedule\(\)` is deprecated."):
- prev = dag.previous_schedule(utc)
- prev_local = local_tz.convert(prev)
-
- assert prev_local.isoformat() == "2018-03-24T03:00:00+01:00"
- assert prev.isoformat() == "2018-03-24T02:00:00+00:00"
-
- with pytest.warns(
- RemovedInAirflow3Warning,
- match=r"`DAG.following_schedule\(\)` is deprecated. Use
`DAG.next_dagrun_info\(restricted=False\)` instead.",
- ):
- _next = dag.following_schedule(utc)
- next_local = local_tz.convert(_next)
-
- assert next_local.isoformat() == "2018-03-25T03:00:00+02:00"
- assert _next.isoformat() == "2018-03-25T01:00:00+00:00"
-
- with pytest.warns(RemovedInAirflow3Warning,
match=r"`DAG.previous_schedule\(\)` is deprecated."):
- prev = dag.previous_schedule(_next)
- prev_local = local_tz.convert(prev)
-
- assert prev_local.isoformat() == "2018-03-24T03:00:00+01:00"
- assert prev.isoformat() == "2018-03-24T02:00:00+00:00"
-
- def test_following_schedule_relativedelta(self):
- """
- Tests following_schedule a dag with a relativedelta schedule
- """
- dag_id = "test_schedule_dag_relativedelta"
- delta = relativedelta(hours=+1)
- dag = DAG(dag_id=dag_id, schedule=delta, start_date=TEST_DATE)
- dag.add_task(BaseOperator(task_id="faketastic", owner="Also fake",
start_date=TEST_DATE))
-
- with pytest.warns(
- RemovedInAirflow3Warning,
- match=r"`DAG.following_schedule\(\)` is deprecated. Use
`DAG.next_dagrun_info\(restricted=False\)` instead.",
- ):
- _next = dag.following_schedule(TEST_DATE)
- assert _next.isoformat() == "2015-01-02T01:00:00+00:00"
-
- with pytest.warns(
- RemovedInAirflow3Warning,
- match=r"`DAG.following_schedule\(\)` is deprecated. Use
`DAG.next_dagrun_info\(restricted=False\)` instead.",
- ):
- _next = dag.following_schedule(_next)
- assert _next.isoformat() == "2015-01-02T02:00:00+00:00"
-
- def test_following_schedule_relativedelta_with_deprecated_schedule(self):
- """
- Tests following_schedule a dag with a relativedelta schedule
- """
- dag_id = "test_schedule_dag_relativedelta"
- delta = relativedelta(hours=+1)
- dag = DAG(dag_id=dag_id, schedule=delta, start_date=TEST_DATE)
- dag.add_task(BaseOperator(task_id="faketastic", owner="Also fake",
start_date=TEST_DATE))
-
- with pytest.warns(
- RemovedInAirflow3Warning,
- match=r"`DAG.following_schedule\(\)` is deprecated. Use
`DAG.next_dagrun_info\(restricted=False\)` instead.",
- ):
- _next = dag.following_schedule(TEST_DATE)
- assert _next.isoformat() == "2015-01-02T01:00:00+00:00"
-
- with pytest.warns(
- RemovedInAirflow3Warning,
- match=r"`DAG.following_schedule\(\)` is deprecated. Use
`DAG.next_dagrun_info\(restricted=False\)` instead.",
- ):
- _next = dag.following_schedule(_next)
- assert _next.isoformat() == "2015-01-02T02:00:00+00:00"
-
- def
test_following_schedule_relativedelta_with_depr_schedule_decorated_dag(self):
- """
- Tests following_schedule a dag with a relativedelta schedule
- using decorated dag
- """
- from airflow.decorators import dag
-
- dag_id = "test_schedule_dag_relativedelta"
- delta = relativedelta(hours=+1)
-
- @dag(dag_id=dag_id, schedule=delta, start_date=TEST_DATE)
- def mydag():
- BaseOperator(task_id="faketastic", owner="Also fake",
start_date=TEST_DATE)
-
- _dag = mydag()
-
- with pytest.warns(
- RemovedInAirflow3Warning,
- match=r"`DAG.following_schedule\(\)` is deprecated. Use
`DAG.next_dagrun_info\(restricted=False\)` instead.",
- ):
- _next = _dag.following_schedule(TEST_DATE)
- assert _next.isoformat() == "2015-01-02T01:00:00+00:00"
-
- with pytest.warns(
- RemovedInAirflow3Warning,
- match=r"`DAG.following_schedule\(\)` is deprecated. Use
`DAG.next_dagrun_info\(restricted=False\)` instead.",
- ):
- _next = _dag.following_schedule(_next)
- assert _next.isoformat() == "2015-01-02T02:00:00+00:00"
-
- def test_previous_schedule_datetime_timezone(self):
- # Check that we don't get an AttributeError 'name' for self.timezone
-
- start = datetime.datetime(2018, 3, 25, 2, tzinfo=datetime.timezone.utc)
- dag = DAG("tz_dag", start_date=start, schedule="@hourly")
- with pytest.warns(RemovedInAirflow3Warning,
match=r"`DAG.previous_schedule\(\)` is deprecated."):
- when = dag.previous_schedule(start)
- assert when.isoformat() == "2018-03-25T01:00:00+00:00"
-
- def test_following_schedule_datetime_timezone(self):
- # Check that we don't get an AttributeError 'name' for self.timezone
-
- start = datetime.datetime(2018, 3, 25, 2, tzinfo=datetime.timezone.utc)
- dag = DAG("tz_dag", start_date=start, schedule="@hourly")
- with pytest.warns(
- RemovedInAirflow3Warning,
- match=r"`DAG.following_schedule\(\)` is deprecated. Use
`DAG.next_dagrun_info\(restricted=False\)` instead.",
- ):
- when = dag.following_schedule(start)
- assert when.isoformat() == "2018-03-25T03:00:00+00:00"
-
def test_create_dagrun_when_schedule_is_none_and_empty_start_date(self):
# Check that we don't get an AttributeError 'start_date' for
self.start_date when schedule is none
dag = DAG("dag_with_none_schedule_and_empty_start_date", schedule=None)
@@ -955,34 +749,6 @@ class TestDag:
with pytest.raises(ValueError, match="start_date is required when
catchup=True"):
DAG(dag_id="dag_with_non_none_schedule_and_empty_start_date",
schedule="@hourly", catchup=True)
- def test_following_schedule_datetime_timezone_utc0530(self):
- # Check that we don't get an AttributeError 'name' for self.timezone
- class UTC0530(datetime.tzinfo):
- """tzinfo derived concrete class named "+0530" with offset of
19800"""
-
- # can be configured here
- _offset = datetime.timedelta(seconds=19800)
- _dst = datetime.timedelta(0)
- _name = "+0530"
-
- def utcoffset(self, dt):
- return self.__class__._offset
-
- def dst(self, dt):
- return self.__class__._dst
-
- def tzname(self, dt):
- return self.__class__._name
-
- start = datetime.datetime(2018, 3, 25, 10, tzinfo=UTC0530())
- dag = DAG("tz_dag", start_date=start, schedule="@hourly")
- with pytest.warns(
- RemovedInAirflow3Warning,
- match=r"`DAG.following_schedule\(\)` is deprecated. Use
`DAG.next_dagrun_info\(restricted=False\)` instead.",
- ):
- when = dag.following_schedule(start)
- assert when.isoformat() == "2018-03-25T05:30:00+00:00"
-
def test_dagtag_repr(self):
clear_db_dags()
dag = DAG("dag-test-dagtag", schedule=None, start_date=DEFAULT_DATE,
tags=["tag-1", "tag-2"])
@@ -2260,38 +2026,6 @@ my_postgres_conn:
next_info = dag.next_dagrun_info(next_info.data_interval)
assert next_info is None
- def test_next_dagrun_info_start_end_dates(self):
- """
- Tests that an attempt to schedule a task after the Dag's end_date
- does not succeed.
- """
- delta = datetime.timedelta(hours=1)
- runs = 3
- start_date = DEFAULT_DATE
- end_date = start_date + (runs - 1) * delta
- dag_id = "test_schedule_dag_start_end_dates"
- dag = DAG(dag_id=dag_id, start_date=start_date, end_date=end_date,
schedule=delta)
- dag.add_task(BaseOperator(task_id="faketastic", owner="Also fake"))
-
- # Create and schedule the dag runs
- dates = []
- interval = None
- for _ in range(runs):
- next_info = dag.next_dagrun_info(interval)
- if next_info is None:
- dates.append(None)
- else:
- interval = next_info.data_interval
- dates.append(interval.start)
-
- assert all(date is not None for date in dates)
- assert dates[-1] == end_date
- with pytest.warns(
- RemovedInAirflow3Warning,
- match="Passing a datetime to DAG.next_dagrun_info is deprecated.
Use a DataInterval instead.",
- ):
- assert dag.next_dagrun_info(interval.start) is None
-
def test_next_dagrun_info_catchup(self):
"""
Test to check that a DAG with catchup = False only schedules beginning
now, not back to the start date
@@ -2489,32 +2223,6 @@ my_postgres_conn:
assert next_info.data_interval.start == timezone.datetime(2028, 2, 29)
assert next_info.data_interval.end == timezone.datetime(2032, 2, 29)
- def test_replace_outdated_access_control_actions(self):
- outdated_permissions = {
- "role1": {permissions.ACTION_CAN_READ,
permissions.ACTION_CAN_EDIT},
- "role2": {permissions.DEPRECATED_ACTION_CAN_DAG_READ,
permissions.DEPRECATED_ACTION_CAN_DAG_EDIT},
- "role3": {permissions.RESOURCE_DAG_RUN:
{permissions.ACTION_CAN_CREATE}},
- }
- updated_permissions = {
- "role1": {permissions.RESOURCE_DAG: {permissions.ACTION_CAN_READ,
permissions.ACTION_CAN_EDIT}},
- "role2": {permissions.RESOURCE_DAG: {permissions.ACTION_CAN_READ,
permissions.ACTION_CAN_EDIT}},
- "role3": {permissions.RESOURCE_DAG_RUN:
{permissions.ACTION_CAN_CREATE}},
- }
-
- with pytest.warns(DeprecationWarning) as deprecation_warnings:
- dag = DAG(dag_id="dag_with_outdated_perms", schedule=None,
access_control=outdated_permissions)
- assert dag.access_control == updated_permissions
- assert len(deprecation_warnings) == 2
- assert "permission is deprecated" in
str(deprecation_warnings[0].message)
- assert "permission is deprecated" in
str(deprecation_warnings[1].message)
-
- with pytest.warns(DeprecationWarning) as deprecation_warnings:
- dag.access_control = outdated_permissions
- assert dag.access_control == updated_permissions
- assert len(deprecation_warnings) == 2
- assert "permission is deprecated" in
str(deprecation_warnings[0].message)
- assert "permission is deprecated" in
str(deprecation_warnings[1].message)
-
@pytest.mark.parametrize(
"fab_version, perms, expected_exception, expected_perms",
[
@@ -3213,20 +2921,6 @@ class TestDagDecorator:
dag = xcom_pass_to_op()
assert dag.params["value"] == value
- @pytest.mark.xfail(strict=True, reason="Airflow 3.0 should not have
warnings here anymore")
- def test_warning_location(self):
- # NOTE: This only works as long as there is some warning we can emit
from `DAG()`
- @dag_decorator(schedule=None)
- def mydag(): ...
-
- with pytest.warns(RemovedInAirflow3Warning) as warnings:
- line = sys._getframe().f_lineno + 1
- mydag()
-
- w = warnings.pop(RemovedInAirflow3Warning)
- assert w.filename == __file__
- assert w.lineno == line
-
@pytest.mark.parametrize(
"run_id, execution_date",
diff --git a/tests/serialization/test_dag_serialization.py
b/tests/serialization/test_dag_serialization.py
index f7aae9ce8a..8311aa77e7 100644
--- a/tests/serialization/test_dag_serialization.py
+++ b/tests/serialization/test_dag_serialization.py
@@ -317,8 +317,8 @@ def make_user_defined_macro_filter_dag():
(2) templates with function macros have been rendered before
serialization.
"""
- def compute_next_execution_date(dag, execution_date):
- return dag.following_schedule(execution_date)
+ def compute_last_dagrun(dag: DAG):
+ return dag.get_last_dagrun(include_externally_triggered=True)
default_args = {"start_date": datetime(2019, 7, 10)}
dag = DAG(
@@ -326,14 +326,14 @@ def make_user_defined_macro_filter_dag():
schedule=None,
default_args=default_args,
user_defined_macros={
- "next_execution_date": compute_next_execution_date,
+ "last_dagrun": compute_last_dagrun,
},
user_defined_filters={"hello": lambda name: f"Hello {name}"},
catchup=False,
)
BashOperator(
task_id="echo",
- bash_command='echo "{{ next_execution_date(dag, execution_date) }}"',
+ bash_command='echo "{{ last_dagrun(dag) }}"',
dag=dag,
)
return {dag.dag_id: dag}