This is an automated email from the ASF dual-hosted git repository.
weilee pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-1-test by this push:
new 474662d80c2 [v3-1-test] fix(models/dag): handle pre AIP-39 DagRuns
(#58229) (#58773)
474662d80c2 is described below
commit 474662d80c269cfe1dae87ed9632085260e766d2
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Nov 28 13:09:04 2025 +0800
[v3-1-test] fix(models/dag): handle pre AIP-39 DagRuns (#58229) (#58773)
Co-authored-by: Wei Lee <[email protected]>
---
airflow-core/src/airflow/models/dag.py | 9 ++++++--
airflow-core/tests/unit/models/test_dag.py | 34 ++++++++++++++++++++++++++++++
2 files changed, 41 insertions(+), 2 deletions(-)
diff --git a/airflow-core/src/airflow/models/dag.py
b/airflow-core/src/airflow/models/dag.py
index 4f6f22b2f80..1dfe5c3b46e 100644
--- a/airflow-core/src/airflow/models/dag.py
+++ b/airflow-core/src/airflow/models/dag.py
@@ -135,9 +135,14 @@ def get_run_data_interval(timetable: Timetable, run:
DagRun) -> DataInterval:
:meta private:
"""
- data_interval = _get_model_data_interval(run, "data_interval_start",
"data_interval_end")
- if data_interval is not None:
+ if (
+ data_interval := _get_model_data_interval(run, "data_interval_start",
"data_interval_end")
+ ) is not None:
+ return data_interval
+
+ if (data_interval :=
timetable.infer_manual_data_interval(run_after=run.run_after)) is not None:
return data_interval
+
# Compatibility: runs created before AIP-39 implementation don't have an
# explicit data interval. Try to infer from the logical date.
return infer_automated_data_interval(timetable, run.logical_date)
diff --git a/airflow-core/tests/unit/models/test_dag.py
b/airflow-core/tests/unit/models/test_dag.py
index febed30fbd2..e9c77912e9f 100644
--- a/airflow-core/tests/unit/models/test_dag.py
+++ b/airflow-core/tests/unit/models/test_dag.py
@@ -52,6 +52,7 @@ from airflow.models.dag import (
DagTag,
get_asset_triggered_next_run_info,
get_next_data_interval,
+ get_run_data_interval,
)
from airflow.models.dagbag import DBDagBag
from airflow.models.dagbundle import DagBundleModel
@@ -3378,3 +3379,36 @@ def test_disable_bundle_versioning(disable,
bundle_version, expected, dag_maker,
# but it only gets stamped on the dag run when bundle versioning not
disabled
assert dr.bundle_version == expected
+
+
+def test_get_run_data_interval():
+ with DAG("dag", schedule=None, start_date=DEFAULT_DATE) as dag:
+ EmptyOperator(task_id="empty_task")
+
+ dr = _create_dagrun(
+ dag,
+ logical_date=timezone.utcnow(),
+ data_interval=(DEFAULT_DATE, DEFAULT_DATE),
+ run_type=DagRunType.MANUAL,
+ )
+ assert get_run_data_interval(dag.timetable, dr) ==
DataInterval(start=DEFAULT_DATE, end=DEFAULT_DATE)
+
+
+def test_get_run_data_interval_pre_aip_39():
+ with DAG(
+ "dag",
+ schedule="0 0 * * *",
+ start_date=DEFAULT_DATE,
+ ) as dag:
+ EmptyOperator(task_id="empty_task")
+
+ current_ts = timezone.utcnow()
+ dr = _create_dagrun(
+ dag,
+ logical_date=current_ts,
+ data_interval=(None, None),
+ run_type=DagRunType.MANUAL,
+ )
+ ds_start = current_ts.replace(hour=0, minute=0, second=0, microsecond=0) -
timedelta(days=1)
+ ds_end = current_ts.replace(hour=0, minute=0, second=0, microsecond=0)
+ assert get_run_data_interval(dag.timetable, dr) ==
DataInterval(start=ds_start, end=ds_end)