This is an automated email from the ASF dual-hosted git repository.
weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new a083930490 fix(dag): avoid getting dataset next run info for
unresolved dataset alias (#41828)
a083930490 is described below
commit a083930490c33949377594cfbe02ed928d90d899
Author: Wei Lee <[email protected]>
AuthorDate: Fri Sep 20 01:17:45 2024 -0700
fix(dag): avoid getting dataset next run info for unresolved dataset alias
(#41828)
---
airflow/models/dag.py | 5 ++++-
airflow/timetables/simple.py | 4 +++-
tests/models/test_dag.py | 16 ++++++++++++++++
3 files changed, 23 insertions(+), 2 deletions(-)
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 6447f7be15..388f322d2d 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -3274,7 +3274,10 @@ class DagModel(Base):
def get_dataset_triggered_next_run_info(self, *, session=NEW_SESSION) ->
dict[str, int | str] | None:
if self.dataset_expression is None:
return None
- return get_dataset_triggered_next_run_info([self.dag_id],
session=session)[self.dag_id]
+
+ # When a dataset alias does not resolve into datasets,
get_dataset_triggered_next_run_info returns
+ # an empty dict as there's no dataset info to get. This method should
thus return None.
+ return get_dataset_triggered_next_run_info([self.dag_id],
session=session).get(self.dag_id, None)
# NOTE: Please keep the list of arguments in sync with DAG.__init__.
diff --git a/airflow/timetables/simple.py b/airflow/timetables/simple.py
index 1c54f4ddee..ad166a6413 100644
--- a/airflow/timetables/simple.py
+++ b/airflow/timetables/simple.py
@@ -161,6 +161,8 @@ class DatasetTriggeredTimetable(_TrivialTimetable):
:meta private:
"""
+ UNRESOLVED_ALIAS_SUMMARY = "Unresolved DatasetAlias"
+
description: str = "Triggered by datasets"
def __init__(self, datasets: BaseDataset) -> None:
@@ -170,7 +172,7 @@ class DatasetTriggeredTimetable(_TrivialTimetable):
self.dataset_condition =
_DatasetAliasCondition(self.dataset_condition.name)
if not next(self.dataset_condition.iter_datasets(), False):
- self._summary = "Unresolved DatasetAlias"
+ self._summary = DatasetTriggeredTimetable.UNRESOLVED_ALIAS_SUMMARY
else:
self._summary = "Dataset"
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index 093fdcae2f..d5fd2ad729 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -3433,6 +3433,22 @@ def test_get_dataset_triggered_next_run_info(dag_maker,
clear_datasets):
}
[email protected]_serialized_dag
+def
test_get_dataset_triggered_next_run_info_with_unresolved_dataset_alias(dag_maker,
clear_datasets):
+ dataset_alias1 = DatasetAlias(name="alias")
+ with dag_maker(dag_id="dag-1", schedule=[dataset_alias1]):
+ pass
+ dag1 = dag_maker.dag
+ session = dag_maker.session
+ session.flush()
+
+ info = get_dataset_triggered_next_run_info([dag1.dag_id], session=session)
+ assert info == {}
+
+ dag1_model = DagModel.get_dagmodel(dag1.dag_id)
+ assert dag1_model.get_dataset_triggered_next_run_info(session=session) is
None
+
+
def test_dag_uses_timetable_for_run_id(session):
class CustomRunIdTimetable(Timetable):
def generate_run_id(self, *, run_type, logical_date, data_interval,
**extra) -> str: