This is an automated email from the ASF dual-hosted git repository.
dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 7edec782496 Remove align param from iter dagrun infos (#61420)
7edec782496 is described below
commit 7edec7824967c69f84234b9aafa5f0568b3e9920
Author: Daniel Standish <[email protected]>
AuthorDate: Wed Feb 4 08:10:31 2026 -0800
Remove align param from iter dagrun infos (#61420)
This param does not appear to be needed. It's only set to False when called
from within get_run_ids, within set_state, which is only called when marking
tasks as failed etc, and only under certain conditions, when clearing "past" or
"future" runs. I think it only would make a difference if the object run was
manually triggered thus did not align with the timetable. We can handle that
case by just including the object run in the returned run ids.
---
airflow-core/src/airflow/api/common/mark_tasks.py | 7 ++++---
.../src/airflow/serialization/definitions/dag.py | 22 ----------------------
airflow-core/tests/unit/models/test_dag.py | 3 +--
3 files changed, 5 insertions(+), 27 deletions(-)
diff --git a/airflow-core/src/airflow/api/common/mark_tasks.py
b/airflow-core/src/airflow/api/common/mark_tasks.py
index 09fa9cd5c5e..bb7fee0c2bf 100644
--- a/airflow-core/src/airflow/api/common/mark_tasks.py
+++ b/airflow-core/src/airflow/api/common/mark_tasks.py
@@ -185,11 +185,12 @@ def get_run_ids(dag: SerializedDAG, run_id: str, future:
bool, past: bool, sessi
elif not dag.timetable.periodic:
run_ids = [run_id]
else:
- dates = [
+ dates = {current_logical_date}
+ dates.update(
info.logical_date
- for info in dag.iter_dagrun_infos_between(start_date, end_date,
align=False)
+ for info in dag.iter_dagrun_infos_between(start_date, end_date)
if info.logical_date # todo: AIP-76 this will not find anything
where logical date is null
- ]
+ )
run_ids = [dr.run_id for dr in DagRun.find(dag_id=dag.dag_id,
logical_date=dates, session=session)]
return run_ids
diff --git a/airflow-core/src/airflow/serialization/definitions/dag.py
b/airflow-core/src/airflow/serialization/definitions/dag.py
index ea3009c5645..bfdf02bd3ac 100644
--- a/airflow-core/src/airflow/serialization/definitions/dag.py
+++ b/airflow-core/src/airflow/serialization/definitions/dag.py
@@ -434,8 +434,6 @@ class SerializedDAG:
self,
earliest: datetime.datetime | None,
latest: datetime.datetime,
- *,
- align: bool = True,
) -> Iterable[DagRunInfo]:
"""
Yield DagRunInfo using this DAG's timetable between given interval.
@@ -443,17 +441,6 @@ class SerializedDAG:
DagRunInfo instances yielded if their ``logical_date`` is not earlier
than ``earliest``, nor later than ``latest``. The instances are ordered
by their ``logical_date`` from earliest to latest.
-
- If ``align`` is ``False``, the first run will happen immediately on
- ``earliest``, even if it does not fall on the logical timetable
schedule.
- The default is ``True``.
-
- Example: A DAG is scheduled to run every midnight (``0 0 * * *``). If
- ``earliest`` is ``2021-06-03 23:00:00``, the first DagRunInfo would be
- ``2021-06-03 23:00:00`` if ``align=False``, and ``2021-06-04 00:00:00``
- if ``align=True``.
-
- # see issue https://github.com/apache/airflow/issues/60455
"""
if isinstance(self.timetable, CronPartitionTimetable):
# todo: AIP-76 need to update this so that it handles partitions
@@ -481,21 +468,12 @@ class SerializedDAG:
info = None
if info is None:
- # No runs to be scheduled between the user-supplied timeframe. But
- # if align=False, "invent" a data interval for the timeframe
itself.
- if not align:
- yield DagRunInfo.interval(earliest, latest)
return
if TYPE_CHECKING:
# todo: AIP-76 after updating this function for partitions, this
may not be true
assert info.data_interval is not None
- # If align=False and earliest does not fall on the timetable's logical
- # schedule, "invent" a data interval for it.
- if not align and info.logical_date != earliest:
- yield DagRunInfo.interval(earliest, info.data_interval.start)
-
# Generate naturally according to schedule.
while info is not None:
yield info
diff --git a/airflow-core/tests/unit/models/test_dag.py
b/airflow-core/tests/unit/models/test_dag.py
index 9683a500d95..59a9e153f0a 100644
--- a/airflow-core/tests/unit/models/test_dag.py
+++ b/airflow-core/tests/unit/models/test_dag.py
@@ -2747,7 +2747,6 @@ def test_iter_dagrun_infos_between(start_date,
expected_infos):
iterator = create_scheduler_dag(dag).iter_dagrun_infos_between(
earliest=pendulum.instance(start_date),
latest=pendulum.instance(DEFAULT_DATE),
- align=True,
)
assert expected_infos == list(iterator)
@@ -2784,7 +2783,7 @@ def test_iter_dagrun_infos_between_error(caplog):
):
scheduler_dag = create_scheduler_dag(dag)
- iterator = scheduler_dag.iter_dagrun_infos_between(earliest=start,
latest=end, align=True)
+ iterator = scheduler_dag.iter_dagrun_infos_between(earliest=start,
latest=end)
with caplog.at_level(logging.ERROR):
infos = list(iterator)