This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 1fcbde2e32 Remove deprecated and unused methods / properties on DAG 
(#41440)
1fcbde2e32 is described below

commit 1fcbde2e32970bb807089b19ecf04976c17462f3
Author: Daniel Standish <[email protected]>
AuthorDate: Tue Aug 13 18:02:55 2024 -0700

    Remove deprecated and unused methods / properties on DAG (#41440)
    
    * Remove deprecated and unused methods / properties on DAG
    
    (cherry picked from commit 6bd4f83062151d427dab764bca123ba396eda6c0)
    
    * add newsfragment
    
    * fix test
    
    * fix test
    
    * Fix formatting
    
    ---------
    
    Co-authored-by: Tzu-ping Chung <[email protected]>
---
 airflow/models/dag.py               | 233 ------------------------------------
 newsfragments/41440.significant.rst |  16 +++
 tests/jobs/test_scheduler_job.py    |  21 ++--
 tests/models/test_dag.py            |  14 ---
 4 files changed, 27 insertions(+), 257 deletions(-)

diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 7b762aa18d..50e2222bd0 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -131,7 +131,6 @@ from airflow.timetables.simple import (
 from airflow.timetables.trigger import CronTriggerTimetable
 from airflow.utils import timezone
 from airflow.utils.dag_cycle_tester import check_cycle
-from airflow.utils.dates import cron_presets, date_range as utils_date_range
 from airflow.utils.decorators import fixup_decorator_warning_stack
 from airflow.utils.helpers import at_most_one, exactly_one, 
validate_instance_args, validate_key
 from airflow.utils.log.logging_mixin import LoggingMixin
@@ -944,60 +943,6 @@ class DAG(LoggingMixin):
 
         return updated_access_control
 
-    def date_range(
-        self,
-        start_date: pendulum.DateTime,
-        num: int | None = None,
-        end_date: datetime | None = None,
-    ) -> list[datetime]:
-        message = "`DAG.date_range()` is deprecated."
-        if num is not None:
-            warnings.warn(message, category=RemovedInAirflow3Warning, 
stacklevel=2)
-            with warnings.catch_warnings():
-                warnings.simplefilter("ignore", RemovedInAirflow3Warning)
-                return utils_date_range(
-                    start_date=start_date, num=num, 
delta=self.normalized_schedule_interval
-                )
-        message += " Please use `DAG.iter_dagrun_infos_between(..., 
align=False)` instead."
-        warnings.warn(message, category=RemovedInAirflow3Warning, stacklevel=2)
-        if end_date is None:
-            coerced_end_date = timezone.utcnow()
-        else:
-            coerced_end_date = end_date
-        it = self.iter_dagrun_infos_between(start_date, 
pendulum.instance(coerced_end_date), align=False)
-        return [info.logical_date for info in it]
-
-    def is_fixed_time_schedule(self):
-        """
-        Figures out if the schedule has a fixed time (e.g. 3 AM every day).
-
-        Detection is done by "peeking" the next two cron trigger time; if the
-        two times have the same minute and hour value, the schedule is fixed,
-        and we *don't* need to perform the DST fix.
-
-        This assumes DST happens on whole minute changes (e.g. 12:59 -> 12:00).
-
-        Do not try to understand what this actually means. It is old logic that
-        should not be used anywhere.
-        """
-        warnings.warn(
-            "`DAG.is_fixed_time_schedule()` is deprecated.",
-            category=RemovedInAirflow3Warning,
-            stacklevel=2,
-        )
-
-        from airflow.timetables._cron import CronMixin
-
-        if not isinstance(self.timetable, CronMixin):
-            return True
-
-        from croniter import croniter
-
-        cron = croniter(self.timetable._expression)
-        next_a = cron.get_next(datetime)
-        next_b = cron.get_next(datetime)
-        return next_b.minute == next_a.minute and next_b.hour == next_a.hour
-
     def following_schedule(self, dttm):
         """
         Calculate the following schedule for this dag in UTC.
@@ -1162,21 +1107,6 @@ class DAG(LoggingMixin):
             info = None
         return info
 
-    def next_dagrun_after_date(self, date_last_automated_dagrun: 
pendulum.DateTime | None):
-        warnings.warn(
-            "`DAG.next_dagrun_after_date()` is deprecated. Please use 
`DAG.next_dagrun_info()` instead.",
-            category=RemovedInAirflow3Warning,
-            stacklevel=2,
-        )
-        if date_last_automated_dagrun is None:
-            data_interval = None
-        else:
-            data_interval = 
self.infer_automated_data_interval(date_last_automated_dagrun)
-        info = self.next_dagrun_info(data_interval)
-        if info is None:
-            return None
-        return info.run_after
-
     @functools.cached_property
     def _time_restriction(self) -> TimeRestriction:
         start_dates = [t.start_date for t in self.tasks if t.start_date]
@@ -1267,46 +1197,6 @@ class DAG(LoggingMixin):
                 )
                 break
 
-    def get_run_dates(self, start_date, end_date=None) -> list:
-        """
-        Return a list of dates between the interval received as parameter 
using this dag's schedule interval.
-
-        Returned dates can be used for execution dates.
-
-        :param start_date: The start date of the interval.
-        :param end_date: The end date of the interval. Defaults to 
``timezone.utcnow()``.
-        :return: A list of dates within the interval following the dag's 
schedule.
-        """
-        warnings.warn(
-            "`DAG.get_run_dates()` is deprecated. Please use 
`DAG.iter_dagrun_infos_between()` instead.",
-            category=RemovedInAirflow3Warning,
-            stacklevel=2,
-        )
-        earliest = timezone.coerce_datetime(start_date)
-        if end_date is None:
-            latest = pendulum.now(timezone.utc)
-        else:
-            latest = timezone.coerce_datetime(end_date)
-        return [info.logical_date for info in 
self.iter_dagrun_infos_between(earliest, latest)]
-
-    def normalize_schedule(self, dttm):
-        warnings.warn(
-            "`DAG.normalize_schedule()` is deprecated.",
-            category=RemovedInAirflow3Warning,
-            stacklevel=2,
-        )
-        with warnings.catch_warnings():
-            warnings.simplefilter("ignore", RemovedInAirflow3Warning)
-            following = self.following_schedule(dttm)
-        if not following:  # in case of @once
-            return dttm
-        with warnings.catch_warnings():
-            warnings.simplefilter("ignore", RemovedInAirflow3Warning)
-            previous_of_following = self.previous_schedule(following)
-        if previous_of_following != dttm:
-            return following
-        return dttm
-
     @provide_session
     def get_last_dagrun(self, session=NEW_SESSION, 
include_externally_triggered=False):
         return get_last_dagrun(
@@ -1330,43 +1220,6 @@ class DAG(LoggingMixin):
     def dag_id(self, value: str) -> None:
         self._dag_id = value
 
-    @property
-    def full_filepath(self) -> str:
-        """
-        Full file path to the DAG.
-
-        :meta private:
-        """
-        warnings.warn(
-            "DAG.full_filepath is deprecated in favour of fileloc",
-            RemovedInAirflow3Warning,
-            stacklevel=2,
-        )
-        return self.fileloc
-
-    @full_filepath.setter
-    def full_filepath(self, value) -> None:
-        warnings.warn(
-            "DAG.full_filepath is deprecated in favour of fileloc",
-            RemovedInAirflow3Warning,
-            stacklevel=2,
-        )
-        self.fileloc = value
-
-    @property
-    def concurrency(self) -> int:
-        # TODO: Remove in Airflow 3.0
-        warnings.warn(
-            "The 'DAG.concurrency' attribute is deprecated. Please use 
'DAG.max_active_tasks'.",
-            RemovedInAirflow3Warning,
-            stacklevel=2,
-        )
-        return self._max_active_tasks
-
-    @concurrency.setter
-    def concurrency(self, value: int):
-        self._max_active_tasks = value
-
     @property
     def max_active_tasks(self) -> int:
         return self._max_active_tasks
@@ -1438,20 +1291,6 @@ class DAG(LoggingMixin):
     def task_group(self) -> TaskGroup:
         return self._task_group
 
-    @property
-    def filepath(self) -> str:
-        """
-        Relative file path to the DAG.
-
-        :meta private:
-        """
-        warnings.warn(
-            "filepath is deprecated, use relative_fileloc instead",
-            RemovedInAirflow3Warning,
-            stacklevel=2,
-        )
-        return str(self.relative_fileloc)
-
     @property
     def relative_fileloc(self) -> pathlib.Path:
         """File location of the importable dag 'file' relative to the 
configured DAGs folder."""
@@ -1496,16 +1335,6 @@ class DAG(LoggingMixin):
         )
         return total_tasks >= self.max_active_tasks
 
-    @property
-    def concurrency_reached(self):
-        """Use `airflow.models.DAG.get_concurrency_reached`, this attribute is 
deprecated."""
-        warnings.warn(
-            "This attribute is deprecated. Please use 
`airflow.models.DAG.get_concurrency_reached` method.",
-            RemovedInAirflow3Warning,
-            stacklevel=2,
-        )
-        return self.get_concurrency_reached()
-
     @provide_session
     def get_is_active(self, session=NEW_SESSION) -> None:
         """Return a boolean indicating whether this DAG is active."""
@@ -1526,21 +1355,6 @@ class DAG(LoggingMixin):
         )
         return self.get_is_paused()
 
-    @property
-    def normalized_schedule_interval(self) -> ScheduleInterval:
-        warnings.warn(
-            "DAG.normalized_schedule_interval() is deprecated.",
-            category=RemovedInAirflow3Warning,
-            stacklevel=2,
-        )
-        if isinstance(self.schedule_interval, str) and self.schedule_interval 
in cron_presets:
-            _schedule_interval: ScheduleInterval = 
cron_presets.get(self.schedule_interval)
-        elif self.schedule_interval == "@once":
-            _schedule_interval = None
-        else:
-            _schedule_interval = self.schedule_interval
-        return _schedule_interval
-
     @staticmethod
     @internal_api_call
     @provide_session
@@ -1724,16 +1538,6 @@ class DAG(LoggingMixin):
         """Return the latest date for which at least one dag run exists."""
         return 
session.scalar(select(func.max(DagRun.execution_date)).where(DagRun.dag_id == 
self.dag_id))
 
-    @property
-    def latest_execution_date(self):
-        """Use `airflow.models.DAG.get_latest_execution_date`, this attribute 
is deprecated."""
-        warnings.warn(
-            "This attribute is deprecated. Please use 
`airflow.models.DAG.get_latest_execution_date`.",
-            RemovedInAirflow3Warning,
-            stacklevel=2,
-        )
-        return self.get_latest_execution_date()
-
     def resolve_template_files(self):
         for t in self.tasks:
             t.resolve_template_files()
@@ -2264,28 +2068,6 @@ class DAG(LoggingMixin):
 
         return tuple(nested_topo(self.task_group))
 
-    @provide_session
-    def set_dag_runs_state(
-        self,
-        state: DagRunState = DagRunState.RUNNING,
-        session: Session = NEW_SESSION,
-        start_date: datetime | None = None,
-        end_date: datetime | None = None,
-        dag_ids: list[str] | None = None,
-    ) -> None:
-        warnings.warn(
-            "This method is deprecated and will be removed in a future 
version.",
-            RemovedInAirflow3Warning,
-            stacklevel=3,
-        )
-        dag_ids = dag_ids or [self.dag_id]
-        query = update(DagRun).where(DagRun.dag_id.in_(dag_ids))
-        if start_date:
-            query = query.where(DagRun.execution_date >= start_date)
-        if end_date:
-            query = query.where(DagRun.execution_date <= end_date)
-        
session.execute(query.values(state=state).execution_options(synchronize_session="fetch"))
-
     @provide_session
     def clear(
         self,
@@ -3055,21 +2837,6 @@ class DAG(LoggingMixin):
         )
         return run
 
-    @classmethod
-    @provide_session
-    def bulk_sync_to_db(
-        cls,
-        dags: Collection[DAG],
-        session=NEW_SESSION,
-    ):
-        """Use `airflow.models.DAG.bulk_write_to_db`, this method is 
deprecated."""
-        warnings.warn(
-            "This method is deprecated and will be removed in a future 
version. Please use bulk_write_to_db",
-            RemovedInAirflow3Warning,
-            stacklevel=2,
-        )
-        return cls.bulk_write_to_db(dags=dags, session=session)
-
     @classmethod
     @provide_session
     def bulk_write_to_db(
diff --git a/newsfragments/41440.significant.rst 
b/newsfragments/41440.significant.rst
new file mode 100644
index 0000000000..4f819bb4d8
--- /dev/null
+++ b/newsfragments/41440.significant.rst
@@ -0,0 +1,16 @@
+Removed unused methods / properties in models/dag.py
+
+Methods removed:
+  * date_range
+  * is_fixed_time_schedule
+  * next_dagrun_after_date
+  * get_run_dates
+  * normalize_schedule
+  * full_filepath
+  * concurrency
+  * filepath
+  * concurrency_reached
+  * normalized_schedule_interval
+  * latest_execution_date
+  * set_dag_runs_state
+  * bulk_sync_to_db
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 8fdbf4826d..8475e6c4cb 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -5284,16 +5284,17 @@ class TestSchedulerJob:
             self.job_runner._find_zombies()
 
         scheduler_job.executor.callback_sink.send.assert_called_once()
-        requests = scheduler_job.executor.callback_sink.send.call_args.args
-        assert 1 == len(requests)
-        assert requests[0].full_filepath == dag.fileloc
-        assert requests[0].msg == 
str(self.job_runner._generate_zombie_message_details(ti))
-        assert requests[0].is_failure_callback is True
-        assert isinstance(requests[0].simple_task_instance, SimpleTaskInstance)
-        assert ti.dag_id == requests[0].simple_task_instance.dag_id
-        assert ti.task_id == requests[0].simple_task_instance.task_id
-        assert ti.run_id == requests[0].simple_task_instance.run_id
-        assert ti.map_index == requests[0].simple_task_instance.map_index
+        callback_requests = 
scheduler_job.executor.callback_sink.send.call_args.args
+        assert len(callback_requests) == 1
+        callback_request = callback_requests[0]
+        assert isinstance(callback_request.simple_task_instance, 
SimpleTaskInstance)
+        assert callback_request.full_filepath == dag.fileloc
+        assert callback_request.msg == 
str(self.job_runner._generate_zombie_message_details(ti))
+        assert callback_request.is_failure_callback is True
+        assert callback_request.simple_task_instance.dag_id == ti.dag_id
+        assert callback_request.simple_task_instance.task_id == ti.task_id
+        assert callback_request.simple_task_instance.run_id == ti.run_id
+        assert callback_request.simple_task_instance.map_index == ti.map_index
 
         with create_session() as session:
             session.query(TaskInstance).delete()
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index 2b3961f1e6..95ec87d72b 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -2588,20 +2588,6 @@ my_postgres_conn:
             data_interval=(TEST_DATE, TEST_DATE),
         )
 
-    def test_return_date_range_with_num_method(self):
-        start_date = TEST_DATE
-        delta = timedelta(days=1)
-
-        dag = DAG("dummy-dag", schedule=delta, start_date=start_date)
-        with pytest.warns(RemovedInAirflow3Warning, 
match=r"`DAG.date_range\(\)` is deprecated."):
-            dag_dates = dag.date_range(start_date=start_date, num=3)
-
-        assert dag_dates == [
-            start_date,
-            start_date + delta,
-            start_date + 2 * delta,
-        ]
-
     def test_dag_owner_links(self):
         dag = DAG(
             "dag",

Reply via email to