This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new a083930490 fix(dag): avoid getting dataset next run info for 
unresolved dataset alias (#41828)
a083930490 is described below

commit a083930490c33949377594cfbe02ed928d90d899
Author: Wei Lee <[email protected]>
AuthorDate: Fri Sep 20 01:17:45 2024 -0700

    fix(dag): avoid getting dataset next run info for unresolved dataset alias 
(#41828)
---
 airflow/models/dag.py        |  5 ++++-
 airflow/timetables/simple.py |  4 +++-
 tests/models/test_dag.py     | 16 ++++++++++++++++
 3 files changed, 23 insertions(+), 2 deletions(-)

diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 6447f7be15..388f322d2d 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -3274,7 +3274,10 @@ class DagModel(Base):
     def get_dataset_triggered_next_run_info(self, *, session=NEW_SESSION) -> 
dict[str, int | str] | None:
         if self.dataset_expression is None:
             return None
-        return get_dataset_triggered_next_run_info([self.dag_id], 
session=session)[self.dag_id]
+
+        # When a dataset alias does not resolve into datasets, 
get_dataset_triggered_next_run_info returns
+        # an empty dict as there's no dataset info to get. This method should 
thus return None.
+        return get_dataset_triggered_next_run_info([self.dag_id], 
session=session).get(self.dag_id, None)
 
 
 # NOTE: Please keep the list of arguments in sync with DAG.__init__.
diff --git a/airflow/timetables/simple.py b/airflow/timetables/simple.py
index 1c54f4ddee..ad166a6413 100644
--- a/airflow/timetables/simple.py
+++ b/airflow/timetables/simple.py
@@ -161,6 +161,8 @@ class DatasetTriggeredTimetable(_TrivialTimetable):
     :meta private:
     """
 
+    UNRESOLVED_ALIAS_SUMMARY = "Unresolved DatasetAlias"
+
     description: str = "Triggered by datasets"
 
     def __init__(self, datasets: BaseDataset) -> None:
@@ -170,7 +172,7 @@ class DatasetTriggeredTimetable(_TrivialTimetable):
             self.dataset_condition = 
_DatasetAliasCondition(self.dataset_condition.name)
 
         if not next(self.dataset_condition.iter_datasets(), False):
-            self._summary = "Unresolved DatasetAlias"
+            self._summary = DatasetTriggeredTimetable.UNRESOLVED_ALIAS_SUMMARY
         else:
             self._summary = "Dataset"
 
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index 093fdcae2f..d5fd2ad729 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -3433,6 +3433,22 @@ def test_get_dataset_triggered_next_run_info(dag_maker, 
clear_datasets):
     }
 
 
[email protected]_serialized_dag
+def 
test_get_dataset_triggered_next_run_info_with_unresolved_dataset_alias(dag_maker,
 clear_datasets):
+    dataset_alias1 = DatasetAlias(name="alias")
+    with dag_maker(dag_id="dag-1", schedule=[dataset_alias1]):
+        pass
+    dag1 = dag_maker.dag
+    session = dag_maker.session
+    session.flush()
+
+    info = get_dataset_triggered_next_run_info([dag1.dag_id], session=session)
+    assert info == {}
+
+    dag1_model = DagModel.get_dagmodel(dag1.dag_id)
+    assert dag1_model.get_dataset_triggered_next_run_info(session=session) is 
None
+
+
 def test_dag_uses_timetable_for_run_id(session):
     class CustomRunIdTimetable(Timetable):
         def generate_run_id(self, *, run_type, logical_date, data_interval, 
**extra) -> str:

Reply via email to