This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 3f744ad8c5b Update iter_dagrun_infos_between to use 
next_dagrun_info_v2 (#61465)
3f744ad8c5b is described below

commit 3f744ad8c5badacd8ee1ce045cc92d0dbf855228
Author: Daniel Standish <[email protected]>
AuthorDate: Thu Feb 5 08:19:05 2026 -0800

    Update iter_dagrun_infos_between to use next_dagrun_info_v2 (#61465)
    
    This is so that it will work for new timetables which implement 
next_dagrun_info_v2 (which receives DagRunInfo objects instead of data 
intervals.
    
    I also remove logic that is no longer needed and simplify the function. 
Some of the logic is leftover from when the align parameter was there (removed 
in #61420). Without it, the logic can be simplified and condensed, e.g. by not 
getting an initial info before starting the loop.
---
 .../src/airflow/serialization/definitions/dag.py   | 48 +++++++---------------
 airflow-core/tests/unit/models/test_dag.py         |  2 +-
 2 files changed, 15 insertions(+), 35 deletions(-)

diff --git a/airflow-core/src/airflow/serialization/definitions/dag.py 
b/airflow-core/src/airflow/serialization/definitions/dag.py
index bfdf02bd3ac..b1a2dc8da59 100644
--- a/airflow-core/src/airflow/serialization/definitions/dag.py
+++ b/airflow-core/src/airflow/serialization/definitions/dag.py
@@ -46,7 +46,6 @@ from airflow.serialization.definitions.deadline import 
DeadlineAlertFields, Seri
 from airflow.serialization.definitions.param import SerializedParamsDict
 from airflow.serialization.enums import DagAttributeTypes as DAT, Encoding
 from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction
-from airflow.timetables.trigger import CronPartitionTimetable
 from airflow.utils.session import NEW_SESSION, provide_session
 from airflow.utils.state import DagRunState, TaskInstanceState
 from airflow.utils.types import DagRunType
@@ -441,10 +440,9 @@ class SerializedDAG:
         DagRunInfo instances yielded if their ``logical_date`` is not earlier
         than ``earliest``, nor later than ``latest``. The instances are ordered
         by their ``logical_date`` from earliest to latest.
+
+        # TODO: AIP-76 see issue https://github.com/apache/airflow/issues/60455
         """
-        if isinstance(self.timetable, CronPartitionTimetable):
-            # todo: AIP-76 need to update this so that it handles partitions
-            raise ValueError("Partition-driven timetables not supported yet")
         if earliest is None:
             earliest = self._time_restriction.earliest
         if earliest is None:
@@ -454,41 +452,23 @@ class SerializedDAG:
 
         restriction = TimeRestriction(earliest, latest, catchup=True)
 
+        info = None
         try:
-            info = self.timetable.next_dagrun_info(
-                last_automated_data_interval=None,
-                restriction=restriction,
-            )
+            while True:
+                info = self.timetable.next_dagrun_info_v2(
+                    last_dagrun_info=info,
+                    restriction=restriction,
+                )
+                if info:
+                    yield info
+                else:
+                    break
         except Exception:
             log.exception(
-                "Failed to fetch run info after data interval %s for DAG %r",
-                None,
+                "Failed to fetch run info for Dag '%s'",
                 self.dag_id,
+                last_dagrun_info=info,
             )
-            info = None
-
-        if info is None:
-            return
-
-        if TYPE_CHECKING:
-            # todo: AIP-76 after updating this function for partitions, this 
may not be true
-            assert info.data_interval is not None
-
-        # Generate naturally according to schedule.
-        while info is not None:
-            yield info
-            try:
-                info = self.timetable.next_dagrun_info(
-                    last_automated_data_interval=info.data_interval,
-                    restriction=restriction,
-                )
-            except Exception:
-                log.exception(
-                    "Failed to fetch run info after data interval %s for DAG 
%r",
-                    info.data_interval if info else "<NONE>",
-                    self.dag_id,
-                )
-                break
 
     @provide_session
     def get_concurrency_reached(self, session=NEW_SESSION) -> bool:
diff --git a/airflow-core/tests/unit/models/test_dag.py 
b/airflow-core/tests/unit/models/test_dag.py
index 59a9e153f0a..5471fdc0f17 100644
--- a/airflow-core/tests/unit/models/test_dag.py
+++ b/airflow-core/tests/unit/models/test_dag.py
@@ -2794,7 +2794,7 @@ def test_iter_dagrun_infos_between_error(caplog):
         (
             "airflow.serialization.definitions.dag",
             logging.ERROR,
-            f"Failed to fetch run info after data interval 
{DataInterval(start, end)} for DAG {dag.dag_id!r}",
+            f"Failed to fetch run info for Dag {dag.dag_id!r}",
         ),
     ]
     assert caplog.entries[0].get("exception"), "should contain exception 
context"

Reply via email to