This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch v3-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-0-test by this push:
new 17e63c4fe43 [v3-0-test] fix(serialized_objects): try to infer data
interval if it's none (#51530) (#51913)
17e63c4fe43 is described below
commit 17e63c4fe4361c9db6c6d587d763ee78e7bfe693
Author: Wei Lee <[email protected]>
AuthorDate: Sun Jun 22 03:25:42 2025 +0800
[v3-0-test] fix(serialized_objects): try to infer data interval if it's
none (#51530) (#51913)
* fix(serialized_objects): try to infer data interval if it's none
it handles pre AIP-39 dag runs
* test(serialization_objects): add test case
test_serialized_dag_get_run_data_interval
* fix(serialized_objects): do not infer data interval if the logical_date
is None
* feat(serialized_objects): allow data_interval to be none
(cherry picked from commit 4f26e456cb8d5a76e985696dfa0efcaf6cada959)
---
.../airflow/serialization/serialized_objects.py | 8 +--
.../unit/serialization/test_serialized_objects.py | 76 +++++++++++++++++++++-
2 files changed, 78 insertions(+), 6 deletions(-)
diff --git a/airflow-core/src/airflow/serialization/serialized_objects.py
b/airflow-core/src/airflow/serialization/serialized_objects.py
index 2a8f3cd9da6..f3e49c4fabd 100644
--- a/airflow-core/src/airflow/serialization/serialized_objects.py
+++ b/airflow-core/src/airflow/serialization/serialized_objects.py
@@ -2177,16 +2177,14 @@ class LazyDeserializedDAG(pydantic.BaseModel):
if isinstance(obj, of_type):
yield task["task_id"], obj
- def get_run_data_interval(self, run: DagRun) -> DataInterval:
+ def get_run_data_interval(self, run: DagRun) -> DataInterval | None:
"""Get the data interval of this run."""
if run.dag_id is not None and run.dag_id != self.dag_id:
raise ValueError(f"Arguments refer to different DAGs:
{self.dag_id} != {run.dag_id}")
data_interval = _get_model_data_interval(run, "data_interval_start",
"data_interval_end")
- # the older implementation has call to infer_automated_data_interval
if data_interval is None, do we want to keep that or raise
- # an exception?
- if data_interval is None:
- raise ValueError(f"Cannot calculate data interval for run {run}")
+ if data_interval is None and run.logical_date is not None:
+ data_interval =
self._real_dag.timetable.infer_manual_data_interval(run_after=run.logical_date)
return data_interval
diff --git a/airflow-core/tests/unit/serialization/test_serialized_objects.py
b/airflow-core/tests/unit/serialization/test_serialized_objects.py
index 6f287289673..a0167f1b7db 100644
--- a/airflow-core/tests/unit/serialization/test_serialized_objects.py
+++ b/airflow-core/tests/unit/serialization/test_serialized_objects.py
@@ -43,12 +43,20 @@ from airflow.providers.standard.operators.bash import
BashOperator
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.providers.standard.operators.python import PythonOperator
from airflow.providers.standard.triggers.file import FileDeleteTrigger
-from airflow.sdk.definitions.asset import Asset, AssetAlias, AssetAliasEvent,
AssetUniqueKey, AssetWatcher
+from airflow.sdk import BaseOperator
+from airflow.sdk.definitions.asset import (
+ Asset,
+ AssetAlias,
+ AssetAliasEvent,
+ AssetUniqueKey,
+ AssetWatcher,
+)
from airflow.sdk.definitions.decorators import task
from airflow.sdk.definitions.param import Param
from airflow.sdk.execution_time.context import OutletEventAccessor,
OutletEventAccessors
from airflow.serialization.enums import DagAttributeTypes as DAT, Encoding
from airflow.serialization.serialized_objects import BaseSerialization,
LazyDeserializedDAG, SerializedDAG
+from airflow.timetables.base import DataInterval
from airflow.triggers.base import BaseTrigger
from airflow.utils import timezone
from airflow.utils.db import LazySelectSequence
@@ -494,6 +502,47 @@ def
test_serialized_dag_mapped_task_has_task_concurrency_limits(dag_maker, concu
assert lazy_serialized_dag.has_task_concurrency_limits
[email protected]_test
[email protected](
+ "create_dag_run_kwargs",
+ (
+ {},
+ {
+ "data_interval": None,
+ "logical_date": pendulum.DateTime(2016, 1, 1, 0, 0, 0,
tzinfo=Timezone("UTC")),
+ },
+ {"data_interval": None, "logical_date": None},
+ ),
+ ids=["post-AIP-39", "pre-AIP-39-should-infer", "pre-AIP-39"],
+)
+def test_serialized_dag_get_run_data_interval(create_dag_run_kwargs,
dag_maker, session):
+ """Test whether LazyDeserializedDAG can correctly get dag run data_interval
+
+ post-AIP-39: the dag run itself contains both data_interval start and
data_interval end, and thus can
+ be retrieved directly
+ pre-AIP-39-should-infer: the dag run itself has neither
data_interval_start nor data_interval_end,
+ and thus needs to infer the data_interval from its timetable
+ pre-AIP-39: the dag run itself has neither data_interval_start nor
data_interval_end, and its logical_date
+ is none. it should return data_interval as none
+ """
+ with dag_maker(dag_id="test_dag", session=session, serialized=True) as dag:
+ BaseOperator(task_id="test_task")
+ session.commit()
+
+ dr = dag_maker.create_dagrun(**create_dag_run_kwargs)
+ ser_dict = SerializedDAG.to_dict(dag)
+ deser_dag = LazyDeserializedDAG(data=ser_dict)
+ if "logical_date" in create_dag_run_kwargs and
create_dag_run_kwargs["logical_date"] is None:
+ data_interval = deser_dag.get_run_data_interval(dr)
+ assert data_interval is None
+ else:
+ data_interval = deser_dag.get_run_data_interval(dr)
+ assert data_interval == DataInterval(
+ start=pendulum.DateTime(2015, 12, 31, 0, 0, 0,
tzinfo=Timezone("UTC")),
+ end=pendulum.DateTime(2016, 1, 1, 0, 0, 0, tzinfo=Timezone("UTC")),
+ )
+
+
def test_get_task_assets():
asset1 = Asset("1")
with DAG("testdag") as source_dag:
@@ -510,3 +559,28 @@ def test_get_task_assets():
("c", asset1),
("d", asset1),
]
+
+
+def test_lazy_dag_run_interval_wrong_dag():
+ lazy = LazyDeserializedDAG(data={"dag": {"dag_id": "dag1"}})
+
+ with pytest.raises(ValueError, match="different DAGs"):
+ lazy.get_run_data_interval(DAG_RUN)
+
+
+def test_lazy_dag_run_interval_missing_interval():
+ lazy = LazyDeserializedDAG(data={"dag": {"dag_id": "test_dag_id"}})
+
+ with pytest.raises(ValueError, match="Unsure how to deserialize version
'<not present>'"):
+ lazy.get_run_data_interval(DAG_RUN)
+
+
+def test_lazy_dag_run_interval_success():
+ run = DAG_RUN
+ run.data_interval_start = datetime(2025, 1, 1)
+ run.data_interval_end = datetime(2025, 1, 2)
+
+ lazy = LazyDeserializedDAG(data={"dag": {"dag_id": "test_dag_id"}})
+ interval = lazy.get_run_data_interval(run)
+
+ assert isinstance(interval, DataInterval)