Lee-W commented on code in PR #61702:
URL: https://github.com/apache/airflow/pull/61702#discussion_r2786064832
##########
airflow-core/src/airflow/models/serialized_dag.py:
##########
@@ -418,6 +418,68 @@ def _generate_deadline_uuids(cls, dag_data: dict[str,
Any]) -> dict[str, dict]:
return uuid_mapping
+ @classmethod
+ def _try_reuse_deadline_uuids(
+ cls,
+ existing_deadline_uuids: list[str],
+ new_deadline_data: list[dict],
+ session: Session,
+ ) -> dict[str, dict] | None:
+ """
+ Try to reuse existing deadline UUIDs if the deadline definitions
haven't changed.
+
+ Returns None if Deadline hashes are not all identical, indicating they
need to be updated.
+
+ :param existing_deadline_uuids: List of UUID strings from existing
serialized dag
+ :param new_deadline_data: List of new deadline alert data dicts from
the DAG
Review Comment:
```suggestion
:param new_deadline_data: List of new deadline alert data dicts from
the Dag
```
##########
airflow-core/src/airflow/models/serialized_dag.py:
##########
@@ -418,6 +418,68 @@ def _generate_deadline_uuids(cls, dag_data: dict[str,
Any]) -> dict[str, dict]:
return uuid_mapping
+ @classmethod
+ def _try_reuse_deadline_uuids(
+ cls,
+ existing_deadline_uuids: list[str],
+ new_deadline_data: list[dict],
+ session: Session,
+ ) -> dict[str, dict] | None:
+ """
+ Try to reuse existing deadline UUIDs if the deadline definitions
haven't changed.
+
+ Returns None if Deadline hashes are not all identical, indicating they
need to be updated.
+
+ :param existing_deadline_uuids: List of UUID strings from existing
serialized dag
+ :param new_deadline_data: List of new deadline alert data dicts from
the DAG
+ :param session: Database session
+ :return: UUID mapping dict if all match, None if any mismatch detected
+ """
+ if len(existing_deadline_uuids) != len(new_deadline_data):
+ return None
+
+ existing_alerts = session.scalars(
+
select(DeadlineAlertModel).where(DeadlineAlertModel.id.in_(existing_deadline_uuids))
+ ).all()
+
+ if len(existing_alerts) != len(existing_deadline_uuids):
+ return None
+
+ new_alerts_temp = []
+ for deadline_alert in new_deadline_data:
+ deadline_data = deadline_alert.get(Encoding.VAR, deadline_alert)
+ # Create a temporary alert for comparison
+ temp_alert = DeadlineAlertModel(
+ id="temp", # id is required for the object but isn't included
in the __eq__
Review Comment:
Is it possible to have multiple DeadlineAlertModels with the same property
but different IDs?
DeadlineAlertModel(id=1, reference="the same"...)
DeadlineAlertModel(id=2, reference="the same"...)
If it's possible, it looks incorrect to use `__eq__` to me. Comparing those
attributes directly might seem lengthy, but it is probably more accurate and
less confusing.
##########
airflow-core/src/airflow/models/serialized_dag.py:
##########
@@ -503,9 +565,20 @@ def write_dag(
and existing_serialized_dag.data
and (existing_deadline_uuids :=
existing_serialized_dag.data.get("dag", {}).get("deadline"))
):
- dag.data["dag"]["deadline"] = existing_deadline_uuids
- deadline_uuid_mapping = {}
+ deadline_uuid_mapping = cls._try_reuse_deadline_uuids(
+ existing_deadline_uuids,
+ dag.data["dag"]["deadline"],
+ session,
+ )
+
+ if deadline_uuid_mapping is not None:
+ # All deadlines matched, reuse the UUIDs to preserve hash.
+ dag.data["dag"]["deadline"] =
list(deadline_uuid_mapping.keys())
+ else:
+ # At least one deadline has changed, generate new UUIDs
and update the hash.
+ deadline_uuid_mapping =
cls._generate_deadline_uuids(dag.data)
else:
+ # First time seeing this DAG with deadlines, generate new
UUIDs and update the hash.
Review Comment:
```suggestion
# First time seeing this Dag with deadlines, generate new
UUIDs and update the hash.
```
##########
airflow-core/src/airflow/models/serialized_dag.py:
##########
@@ -418,6 +418,68 @@ def _generate_deadline_uuids(cls, dag_data: dict[str,
Any]) -> dict[str, dict]:
return uuid_mapping
+ @classmethod
+ def _try_reuse_deadline_uuids(
+ cls,
+ existing_deadline_uuids: list[str],
+ new_deadline_data: list[dict],
+ session: Session,
+ ) -> dict[str, dict] | None:
+ """
+ Try to reuse existing deadline UUIDs if the deadline definitions
haven't changed.
+
+ Returns None if Deadline hashes are not all identical, indicating they
need to be updated.
+
+ :param existing_deadline_uuids: List of UUID strings from existing
serialized dag
Review Comment:
```suggestion
:param existing_deadline_uuids: List of UUID strings from existing
serialized Dag
```
##########
airflow-core/tests/unit/models/test_dag.py:
##########
@@ -1872,7 +1873,7 @@ def test_dagrun_deadline(self, reference_type,
reference_column, testing_dag_bun
assert dr.deadlines[0].deadline_time == getattr(dr, reference_column,
DEFAULT_DATE) + interval
def test_dag_with_multiple_deadlines(self, testing_dag_bundle, session):
- """Test that a DAG with multiple deadlines stores all deadlines in the
database."""
+ """Test that a DAG with multiple deadlines stores all deadlines and
persists on re-serialization."""
Review Comment:
```suggestion
"""Test that a Dag with multiple deadlines stores all deadlines and
persists on re-serialization."""
```
##########
airflow-core/tests/unit/models/test_dag.py:
##########
@@ -1899,6 +1901,28 @@ def test_dag_with_multiple_deadlines(self,
testing_dag_bundle, session):
scheduler_dag = sync_dag_to_db(dag, session=session)
+ deadline_alerts = session.scalars(select(DeadlineAlertModel)).all()
+ assert len(deadline_alerts) == expected_num_deadlines
+ initial_uuids = {alert.id for alert in deadline_alerts}
+
+ # Re-serialize the DAG
Review Comment:
```suggestion
# Re-serialize the Dag
```
##########
airflow-core/tests/unit/models/test_serialized_dag.py:
##########
@@ -753,3 +763,48 @@ def test_write_dag_atomicity_on_dagcode_failure(self,
dag_maker, session):
assert len(sdag.dag.task_dict) == 1, (
"SerializedDagModel should not be updated when write fails"
)
+
+ def test_deadline_interval_change_triggers_new_serdag(self,
testing_dag_bundle, session):
+ dag_id = "test_interval_change"
+
+ # Create a new Dag with a deadline and create a dagrun as a baseline..
Review Comment:
```suggestion
# Create a new Dag with a deadline and create a dagrun as a baseline.
```
##########
airflow-core/src/airflow/models/serialized_dag.py:
##########
@@ -503,9 +565,20 @@ def write_dag(
and existing_serialized_dag.data
and (existing_deadline_uuids :=
existing_serialized_dag.data.get("dag", {}).get("deadline"))
):
- dag.data["dag"]["deadline"] = existing_deadline_uuids
- deadline_uuid_mapping = {}
+ deadline_uuid_mapping = cls._try_reuse_deadline_uuids(
+ existing_deadline_uuids,
+ dag.data["dag"]["deadline"],
+ session,
+ )
+
+ if deadline_uuid_mapping is not None:
+ # All deadlines matched, reuse the UUIDs to preserve hash.
+ dag.data["dag"]["deadline"] =
list(deadline_uuid_mapping.keys())
Review Comment:
if all matched, why don't we just use `existing_deadline_uuids`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]