This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 1fcbde2e32 Remove deprecated and unused methods / properties on DAG
(#41440)
1fcbde2e32 is described below
commit 1fcbde2e32970bb807089b19ecf04976c17462f3
Author: Daniel Standish <[email protected]>
AuthorDate: Tue Aug 13 18:02:55 2024 -0700
Remove deprecated and unused methods / properties on DAG (#41440)
* Remove deprecated and unused methods / properties on DAG
(cherry picked from commit 6bd4f83062151d427dab764bca123ba396eda6c0)
* add newsfragment
* fix test
* fix test
* Fix formatting
---------
Co-authored-by: Tzu-ping Chung <[email protected]>
---
airflow/models/dag.py | 233 ------------------------------------
newsfragments/41440.significant.rst | 16 +++
tests/jobs/test_scheduler_job.py | 21 ++--
tests/models/test_dag.py | 14 ---
4 files changed, 27 insertions(+), 257 deletions(-)
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 7b762aa18d..50e2222bd0 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -131,7 +131,6 @@ from airflow.timetables.simple import (
from airflow.timetables.trigger import CronTriggerTimetable
from airflow.utils import timezone
from airflow.utils.dag_cycle_tester import check_cycle
-from airflow.utils.dates import cron_presets, date_range as utils_date_range
from airflow.utils.decorators import fixup_decorator_warning_stack
from airflow.utils.helpers import at_most_one, exactly_one,
validate_instance_args, validate_key
from airflow.utils.log.logging_mixin import LoggingMixin
@@ -944,60 +943,6 @@ class DAG(LoggingMixin):
return updated_access_control
- def date_range(
- self,
- start_date: pendulum.DateTime,
- num: int | None = None,
- end_date: datetime | None = None,
- ) -> list[datetime]:
- message = "`DAG.date_range()` is deprecated."
- if num is not None:
- warnings.warn(message, category=RemovedInAirflow3Warning,
stacklevel=2)
- with warnings.catch_warnings():
- warnings.simplefilter("ignore", RemovedInAirflow3Warning)
- return utils_date_range(
- start_date=start_date, num=num,
delta=self.normalized_schedule_interval
- )
- message += " Please use `DAG.iter_dagrun_infos_between(...,
align=False)` instead."
- warnings.warn(message, category=RemovedInAirflow3Warning, stacklevel=2)
- if end_date is None:
- coerced_end_date = timezone.utcnow()
- else:
- coerced_end_date = end_date
- it = self.iter_dagrun_infos_between(start_date,
pendulum.instance(coerced_end_date), align=False)
- return [info.logical_date for info in it]
-
- def is_fixed_time_schedule(self):
- """
- Figures out if the schedule has a fixed time (e.g. 3 AM every day).
-
- Detection is done by "peeking" the next two cron trigger time; if the
- two times have the same minute and hour value, the schedule is fixed,
- and we *don't* need to perform the DST fix.
-
- This assumes DST happens on whole minute changes (e.g. 12:59 -> 12:00).
-
- Do not try to understand what this actually means. It is old logic that
- should not be used anywhere.
- """
- warnings.warn(
- "`DAG.is_fixed_time_schedule()` is deprecated.",
- category=RemovedInAirflow3Warning,
- stacklevel=2,
- )
-
- from airflow.timetables._cron import CronMixin
-
- if not isinstance(self.timetable, CronMixin):
- return True
-
- from croniter import croniter
-
- cron = croniter(self.timetable._expression)
- next_a = cron.get_next(datetime)
- next_b = cron.get_next(datetime)
- return next_b.minute == next_a.minute and next_b.hour == next_a.hour
-
def following_schedule(self, dttm):
"""
Calculate the following schedule for this dag in UTC.
@@ -1162,21 +1107,6 @@ class DAG(LoggingMixin):
info = None
return info
- def next_dagrun_after_date(self, date_last_automated_dagrun:
pendulum.DateTime | None):
- warnings.warn(
- "`DAG.next_dagrun_after_date()` is deprecated. Please use
`DAG.next_dagrun_info()` instead.",
- category=RemovedInAirflow3Warning,
- stacklevel=2,
- )
- if date_last_automated_dagrun is None:
- data_interval = None
- else:
- data_interval =
self.infer_automated_data_interval(date_last_automated_dagrun)
- info = self.next_dagrun_info(data_interval)
- if info is None:
- return None
- return info.run_after
-
@functools.cached_property
def _time_restriction(self) -> TimeRestriction:
start_dates = [t.start_date for t in self.tasks if t.start_date]
@@ -1267,46 +1197,6 @@ class DAG(LoggingMixin):
)
break
- def get_run_dates(self, start_date, end_date=None) -> list:
- """
- Return a list of dates between the interval received as parameter
using this dag's schedule interval.
-
- Returned dates can be used for execution dates.
-
- :param start_date: The start date of the interval.
- :param end_date: The end date of the interval. Defaults to
``timezone.utcnow()``.
- :return: A list of dates within the interval following the dag's
schedule.
- """
- warnings.warn(
- "`DAG.get_run_dates()` is deprecated. Please use
`DAG.iter_dagrun_infos_between()` instead.",
- category=RemovedInAirflow3Warning,
- stacklevel=2,
- )
- earliest = timezone.coerce_datetime(start_date)
- if end_date is None:
- latest = pendulum.now(timezone.utc)
- else:
- latest = timezone.coerce_datetime(end_date)
- return [info.logical_date for info in
self.iter_dagrun_infos_between(earliest, latest)]
-
- def normalize_schedule(self, dttm):
- warnings.warn(
- "`DAG.normalize_schedule()` is deprecated.",
- category=RemovedInAirflow3Warning,
- stacklevel=2,
- )
- with warnings.catch_warnings():
- warnings.simplefilter("ignore", RemovedInAirflow3Warning)
- following = self.following_schedule(dttm)
- if not following: # in case of @once
- return dttm
- with warnings.catch_warnings():
- warnings.simplefilter("ignore", RemovedInAirflow3Warning)
- previous_of_following = self.previous_schedule(following)
- if previous_of_following != dttm:
- return following
- return dttm
-
@provide_session
def get_last_dagrun(self, session=NEW_SESSION,
include_externally_triggered=False):
return get_last_dagrun(
@@ -1330,43 +1220,6 @@ class DAG(LoggingMixin):
def dag_id(self, value: str) -> None:
self._dag_id = value
- @property
- def full_filepath(self) -> str:
- """
- Full file path to the DAG.
-
- :meta private:
- """
- warnings.warn(
- "DAG.full_filepath is deprecated in favour of fileloc",
- RemovedInAirflow3Warning,
- stacklevel=2,
- )
- return self.fileloc
-
- @full_filepath.setter
- def full_filepath(self, value) -> None:
- warnings.warn(
- "DAG.full_filepath is deprecated in favour of fileloc",
- RemovedInAirflow3Warning,
- stacklevel=2,
- )
- self.fileloc = value
-
- @property
- def concurrency(self) -> int:
- # TODO: Remove in Airflow 3.0
- warnings.warn(
- "The 'DAG.concurrency' attribute is deprecated. Please use
'DAG.max_active_tasks'.",
- RemovedInAirflow3Warning,
- stacklevel=2,
- )
- return self._max_active_tasks
-
- @concurrency.setter
- def concurrency(self, value: int):
- self._max_active_tasks = value
-
@property
def max_active_tasks(self) -> int:
return self._max_active_tasks
@@ -1438,20 +1291,6 @@ class DAG(LoggingMixin):
def task_group(self) -> TaskGroup:
return self._task_group
- @property
- def filepath(self) -> str:
- """
- Relative file path to the DAG.
-
- :meta private:
- """
- warnings.warn(
- "filepath is deprecated, use relative_fileloc instead",
- RemovedInAirflow3Warning,
- stacklevel=2,
- )
- return str(self.relative_fileloc)
-
@property
def relative_fileloc(self) -> pathlib.Path:
"""File location of the importable dag 'file' relative to the
configured DAGs folder."""
@@ -1496,16 +1335,6 @@ class DAG(LoggingMixin):
)
return total_tasks >= self.max_active_tasks
- @property
- def concurrency_reached(self):
- """Use `airflow.models.DAG.get_concurrency_reached`, this attribute is
deprecated."""
- warnings.warn(
- "This attribute is deprecated. Please use
`airflow.models.DAG.get_concurrency_reached` method.",
- RemovedInAirflow3Warning,
- stacklevel=2,
- )
- return self.get_concurrency_reached()
-
@provide_session
def get_is_active(self, session=NEW_SESSION) -> None:
"""Return a boolean indicating whether this DAG is active."""
@@ -1526,21 +1355,6 @@ class DAG(LoggingMixin):
)
return self.get_is_paused()
- @property
- def normalized_schedule_interval(self) -> ScheduleInterval:
- warnings.warn(
- "DAG.normalized_schedule_interval() is deprecated.",
- category=RemovedInAirflow3Warning,
- stacklevel=2,
- )
- if isinstance(self.schedule_interval, str) and self.schedule_interval
in cron_presets:
- _schedule_interval: ScheduleInterval =
cron_presets.get(self.schedule_interval)
- elif self.schedule_interval == "@once":
- _schedule_interval = None
- else:
- _schedule_interval = self.schedule_interval
- return _schedule_interval
-
@staticmethod
@internal_api_call
@provide_session
@@ -1724,16 +1538,6 @@ class DAG(LoggingMixin):
"""Return the latest date for which at least one dag run exists."""
return
session.scalar(select(func.max(DagRun.execution_date)).where(DagRun.dag_id ==
self.dag_id))
- @property
- def latest_execution_date(self):
- """Use `airflow.models.DAG.get_latest_execution_date`, this attribute
is deprecated."""
- warnings.warn(
- "This attribute is deprecated. Please use
`airflow.models.DAG.get_latest_execution_date`.",
- RemovedInAirflow3Warning,
- stacklevel=2,
- )
- return self.get_latest_execution_date()
-
def resolve_template_files(self):
for t in self.tasks:
t.resolve_template_files()
@@ -2264,28 +2068,6 @@ class DAG(LoggingMixin):
return tuple(nested_topo(self.task_group))
- @provide_session
- def set_dag_runs_state(
- self,
- state: DagRunState = DagRunState.RUNNING,
- session: Session = NEW_SESSION,
- start_date: datetime | None = None,
- end_date: datetime | None = None,
- dag_ids: list[str] | None = None,
- ) -> None:
- warnings.warn(
- "This method is deprecated and will be removed in a future
version.",
- RemovedInAirflow3Warning,
- stacklevel=3,
- )
- dag_ids = dag_ids or [self.dag_id]
- query = update(DagRun).where(DagRun.dag_id.in_(dag_ids))
- if start_date:
- query = query.where(DagRun.execution_date >= start_date)
- if end_date:
- query = query.where(DagRun.execution_date <= end_date)
-
session.execute(query.values(state=state).execution_options(synchronize_session="fetch"))
-
@provide_session
def clear(
self,
@@ -3055,21 +2837,6 @@ class DAG(LoggingMixin):
)
return run
- @classmethod
- @provide_session
- def bulk_sync_to_db(
- cls,
- dags: Collection[DAG],
- session=NEW_SESSION,
- ):
- """Use `airflow.models.DAG.bulk_write_to_db`, this method is
deprecated."""
- warnings.warn(
- "This method is deprecated and will be removed in a future
version. Please use bulk_write_to_db",
- RemovedInAirflow3Warning,
- stacklevel=2,
- )
- return cls.bulk_write_to_db(dags=dags, session=session)
-
@classmethod
@provide_session
def bulk_write_to_db(
diff --git a/newsfragments/41440.significant.rst
b/newsfragments/41440.significant.rst
new file mode 100644
index 0000000000..4f819bb4d8
--- /dev/null
+++ b/newsfragments/41440.significant.rst
@@ -0,0 +1,16 @@
+Removed unused methods / properties in models/dag.py
+
+Methods removed:
+ * date_range
+ * is_fixed_time_schedule
+ * next_dagrun_after_date
+ * get_run_dates
+ * normalize_schedule
+ * full_filepath
+ * concurrency
+ * filepath
+ * concurrency_reached
+ * normalized_schedule_interval
+ * latest_execution_date
+ * set_dag_runs_state
+ * bulk_sync_to_db
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 8fdbf4826d..8475e6c4cb 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -5284,16 +5284,17 @@ class TestSchedulerJob:
self.job_runner._find_zombies()
scheduler_job.executor.callback_sink.send.assert_called_once()
- requests = scheduler_job.executor.callback_sink.send.call_args.args
- assert 1 == len(requests)
- assert requests[0].full_filepath == dag.fileloc
- assert requests[0].msg ==
str(self.job_runner._generate_zombie_message_details(ti))
- assert requests[0].is_failure_callback is True
- assert isinstance(requests[0].simple_task_instance, SimpleTaskInstance)
- assert ti.dag_id == requests[0].simple_task_instance.dag_id
- assert ti.task_id == requests[0].simple_task_instance.task_id
- assert ti.run_id == requests[0].simple_task_instance.run_id
- assert ti.map_index == requests[0].simple_task_instance.map_index
+ callback_requests =
scheduler_job.executor.callback_sink.send.call_args.args
+ assert len(callback_requests) == 1
+ callback_request = callback_requests[0]
+ assert isinstance(callback_request.simple_task_instance,
SimpleTaskInstance)
+ assert callback_request.full_filepath == dag.fileloc
+ assert callback_request.msg ==
str(self.job_runner._generate_zombie_message_details(ti))
+ assert callback_request.is_failure_callback is True
+ assert callback_request.simple_task_instance.dag_id == ti.dag_id
+ assert callback_request.simple_task_instance.task_id == ti.task_id
+ assert callback_request.simple_task_instance.run_id == ti.run_id
+ assert callback_request.simple_task_instance.map_index == ti.map_index
with create_session() as session:
session.query(TaskInstance).delete()
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index 2b3961f1e6..95ec87d72b 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -2588,20 +2588,6 @@ my_postgres_conn:
data_interval=(TEST_DATE, TEST_DATE),
)
- def test_return_date_range_with_num_method(self):
- start_date = TEST_DATE
- delta = timedelta(days=1)
-
- dag = DAG("dummy-dag", schedule=delta, start_date=start_date)
- with pytest.warns(RemovedInAirflow3Warning,
match=r"`DAG.date_range\(\)` is deprecated."):
- dag_dates = dag.date_range(start_date=start_date, num=3)
-
- assert dag_dates == [
- start_date,
- start_date + delta,
- start_date + 2 * delta,
- ]
-
def test_dag_owner_links(self):
dag = DAG(
"dag",