This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch v3-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-0-test by this push:
     new 17e63c4fe43 [v3-0-test] fix(serialized_objects): try to infer data 
interval if it's none (#51530) (#51913)
17e63c4fe43 is described below

commit 17e63c4fe4361c9db6c6d587d763ee78e7bfe693
Author: Wei Lee <[email protected]>
AuthorDate: Sun Jun 22 03:25:42 2025 +0800

    [v3-0-test] fix(serialized_objects): try to infer data interval if it's 
none (#51530) (#51913)
    
    * fix(serialized_objects): try to infer data interval if it's none
    
    it handles pre AIP-39 dag runs
    
    * test(serialization_objects): add test case 
test_serialized_dag_get_run_data_interval
    
    * fix(serialized_objects): do not infer data interval if the logical_date 
is None
    
    * feat(serialized_objects): allow data_interval to be none
    (cherry picked from commit 4f26e456cb8d5a76e985696dfa0efcaf6cada959)
---
 .../airflow/serialization/serialized_objects.py    |  8 +--
 .../unit/serialization/test_serialized_objects.py  | 76 +++++++++++++++++++++-
 2 files changed, 78 insertions(+), 6 deletions(-)

diff --git a/airflow-core/src/airflow/serialization/serialized_objects.py 
b/airflow-core/src/airflow/serialization/serialized_objects.py
index 2a8f3cd9da6..f3e49c4fabd 100644
--- a/airflow-core/src/airflow/serialization/serialized_objects.py
+++ b/airflow-core/src/airflow/serialization/serialized_objects.py
@@ -2177,16 +2177,14 @@ class LazyDeserializedDAG(pydantic.BaseModel):
                     if isinstance(obj, of_type):
                         yield task["task_id"], obj
 
-    def get_run_data_interval(self, run: DagRun) -> DataInterval:
+    def get_run_data_interval(self, run: DagRun) -> DataInterval | None:
         """Get the data interval of this run."""
         if run.dag_id is not None and run.dag_id != self.dag_id:
             raise ValueError(f"Arguments refer to different DAGs: 
{self.dag_id} != {run.dag_id}")
 
         data_interval = _get_model_data_interval(run, "data_interval_start", 
"data_interval_end")
-        # the older implementation has call to infer_automated_data_interval 
if data_interval is None, do we want to keep that or raise
-        # an exception?
-        if data_interval is None:
-            raise ValueError(f"Cannot calculate data interval for run {run}")
+        if data_interval is None and run.logical_date is not None:
+            data_interval = 
self._real_dag.timetable.infer_manual_data_interval(run_after=run.logical_date)
 
         return data_interval
 
diff --git a/airflow-core/tests/unit/serialization/test_serialized_objects.py 
b/airflow-core/tests/unit/serialization/test_serialized_objects.py
index 6f287289673..a0167f1b7db 100644
--- a/airflow-core/tests/unit/serialization/test_serialized_objects.py
+++ b/airflow-core/tests/unit/serialization/test_serialized_objects.py
@@ -43,12 +43,20 @@ from airflow.providers.standard.operators.bash import 
BashOperator
 from airflow.providers.standard.operators.empty import EmptyOperator
 from airflow.providers.standard.operators.python import PythonOperator
 from airflow.providers.standard.triggers.file import FileDeleteTrigger
-from airflow.sdk.definitions.asset import Asset, AssetAlias, AssetAliasEvent, 
AssetUniqueKey, AssetWatcher
+from airflow.sdk import BaseOperator
+from airflow.sdk.definitions.asset import (
+    Asset,
+    AssetAlias,
+    AssetAliasEvent,
+    AssetUniqueKey,
+    AssetWatcher,
+)
 from airflow.sdk.definitions.decorators import task
 from airflow.sdk.definitions.param import Param
 from airflow.sdk.execution_time.context import OutletEventAccessor, 
OutletEventAccessors
 from airflow.serialization.enums import DagAttributeTypes as DAT, Encoding
 from airflow.serialization.serialized_objects import BaseSerialization, 
LazyDeserializedDAG, SerializedDAG
+from airflow.timetables.base import DataInterval
 from airflow.triggers.base import BaseTrigger
 from airflow.utils import timezone
 from airflow.utils.db import LazySelectSequence
@@ -494,6 +502,47 @@ def 
test_serialized_dag_mapped_task_has_task_concurrency_limits(dag_maker, concu
     assert lazy_serialized_dag.has_task_concurrency_limits
 
 
[email protected]_test
[email protected](
+    "create_dag_run_kwargs",
+    (
+        {},
+        {
+            "data_interval": None,
+            "logical_date": pendulum.DateTime(2016, 1, 1, 0, 0, 0, 
tzinfo=Timezone("UTC")),
+        },
+        {"data_interval": None, "logical_date": None},
+    ),
+    ids=["post-AIP-39", "pre-AIP-39-should-infer", "pre-AIP-39"],
+)
+def test_serialized_dag_get_run_data_interval(create_dag_run_kwargs, 
dag_maker, session):
+    """Test whether LazyDeserializedDAG can correctly get dag run data_interval
+
+    post-AIP-39: the dag run itself contains both data_interval start and 
data_interval end, and thus can
+        be retrieved directly
+    pre-AIP-39-should-infer: the dag run itself has neither 
data_interval_start nor data_interval_end,
+        and thus needs to infer the data_interval from its timetable
+    pre-AIP-39: the dag run itself has neither data_interval_start nor 
data_interval_end, and its logical_date
+        is none. it should return data_interval as none
+    """
+    with dag_maker(dag_id="test_dag", session=session, serialized=True) as dag:
+        BaseOperator(task_id="test_task")
+    session.commit()
+
+    dr = dag_maker.create_dagrun(**create_dag_run_kwargs)
+    ser_dict = SerializedDAG.to_dict(dag)
+    deser_dag = LazyDeserializedDAG(data=ser_dict)
+    if "logical_date" in create_dag_run_kwargs and 
create_dag_run_kwargs["logical_date"] is None:
+        data_interval = deser_dag.get_run_data_interval(dr)
+        assert data_interval is None
+    else:
+        data_interval = deser_dag.get_run_data_interval(dr)
+        assert data_interval == DataInterval(
+            start=pendulum.DateTime(2015, 12, 31, 0, 0, 0, 
tzinfo=Timezone("UTC")),
+            end=pendulum.DateTime(2016, 1, 1, 0, 0, 0, tzinfo=Timezone("UTC")),
+        )
+
+
 def test_get_task_assets():
     asset1 = Asset("1")
     with DAG("testdag") as source_dag:
@@ -510,3 +559,28 @@ def test_get_task_assets():
         ("c", asset1),
         ("d", asset1),
     ]
+
+
+def test_lazy_dag_run_interval_wrong_dag():
+    lazy = LazyDeserializedDAG(data={"dag": {"dag_id": "dag1"}})
+
+    with pytest.raises(ValueError, match="different DAGs"):
+        lazy.get_run_data_interval(DAG_RUN)
+
+
+def test_lazy_dag_run_interval_missing_interval():
+    lazy = LazyDeserializedDAG(data={"dag": {"dag_id": "test_dag_id"}})
+
+    with pytest.raises(ValueError, match="Unsure how to deserialize version 
'<not present>'"):
+        lazy.get_run_data_interval(DAG_RUN)
+
+
+def test_lazy_dag_run_interval_success():
+    run = DAG_RUN
+    run.data_interval_start = datetime(2025, 1, 1)
+    run.data_interval_end = datetime(2025, 1, 2)
+
+    lazy = LazyDeserializedDAG(data={"dag": {"dag_id": "test_dag_id"}})
+    interval = lazy.get_run_data_interval(run)
+
+    assert isinstance(interval, DataInterval)

Reply via email to