Lee-W commented on code in PR #61702:
URL: https://github.com/apache/airflow/pull/61702#discussion_r2786064832


##########
airflow-core/src/airflow/models/serialized_dag.py:
##########
@@ -418,6 +418,68 @@ def _generate_deadline_uuids(cls, dag_data: dict[str, 
Any]) -> dict[str, dict]:
 
         return uuid_mapping
 
+    @classmethod
+    def _try_reuse_deadline_uuids(
+        cls,
+        existing_deadline_uuids: list[str],
+        new_deadline_data: list[dict],
+        session: Session,
+    ) -> dict[str, dict] | None:
+        """
+        Try to reuse existing deadline UUIDs if the deadline definitions 
haven't changed.
+
+        Returns None if Deadline hashes are not all identical, indicating they 
need to be updated.
+
+        :param existing_deadline_uuids: List of UUID strings from existing 
serialized dag
+        :param new_deadline_data: List of new deadline alert data dicts from 
the DAG

Review Comment:
   ```suggestion
           :param new_deadline_data: List of new deadline alert data dicts from 
the Dag
   ```



##########
airflow-core/src/airflow/models/serialized_dag.py:
##########
@@ -418,6 +418,68 @@ def _generate_deadline_uuids(cls, dag_data: dict[str, 
Any]) -> dict[str, dict]:
 
         return uuid_mapping
 
+    @classmethod
+    def _try_reuse_deadline_uuids(
+        cls,
+        existing_deadline_uuids: list[str],
+        new_deadline_data: list[dict],
+        session: Session,
+    ) -> dict[str, dict] | None:
+        """
+        Try to reuse existing deadline UUIDs if the deadline definitions 
haven't changed.
+
+        Returns None if Deadline hashes are not all identical, indicating they 
need to be updated.
+
+        :param existing_deadline_uuids: List of UUID strings from existing 
serialized dag
+        :param new_deadline_data: List of new deadline alert data dicts from 
the DAG
+        :param session: Database session
+        :return: UUID mapping dict if all match, None if any mismatch detected
+        """
+        if len(existing_deadline_uuids) != len(new_deadline_data):
+            return None
+
+        existing_alerts = session.scalars(
+            
select(DeadlineAlertModel).where(DeadlineAlertModel.id.in_(existing_deadline_uuids))
+        ).all()
+
+        if len(existing_alerts) != len(existing_deadline_uuids):
+            return None
+
+        new_alerts_temp = []
+        for deadline_alert in new_deadline_data:
+            deadline_data = deadline_alert.get(Encoding.VAR, deadline_alert)
+            # Create a temporary alert for comparison
+            temp_alert = DeadlineAlertModel(
+                id="temp",  # id is required for the object but isn't included 
in the __eq__

Review Comment:
   Is it possible to have multiple DeadlineAlertModels with the same property 
but different IDs?
   
   DeadlineAlertModel(id=1, reference="the same"...)
   DeadlineAlertModel(id=2, reference="the same"...)
   
   If it's possible, it looks incorrect to use `__eq__` to me. Comparing those 
attributes directly might seem lengthy, but it is probably more accurate and 
less confusing.



##########
airflow-core/src/airflow/models/serialized_dag.py:
##########
@@ -503,9 +565,20 @@ def write_dag(
                 and existing_serialized_dag.data
                 and (existing_deadline_uuids := 
existing_serialized_dag.data.get("dag", {}).get("deadline"))
             ):
-                dag.data["dag"]["deadline"] = existing_deadline_uuids
-                deadline_uuid_mapping = {}
+                deadline_uuid_mapping = cls._try_reuse_deadline_uuids(
+                    existing_deadline_uuids,
+                    dag.data["dag"]["deadline"],
+                    session,
+                )
+
+                if deadline_uuid_mapping is not None:
+                    # All deadlines matched, reuse the UUIDs to preserve hash.
+                    dag.data["dag"]["deadline"] = 
list(deadline_uuid_mapping.keys())
+                else:
+                    # At least one deadline has changed, generate new UUIDs 
and update the hash.
+                    deadline_uuid_mapping = 
cls._generate_deadline_uuids(dag.data)
             else:
+                # First time seeing this DAG with deadlines, generate new 
UUIDs and update the hash.

Review Comment:
   ```suggestion
                   # First time seeing this Dag with deadlines, generate new 
UUIDs and update the hash.
   ```



##########
airflow-core/src/airflow/models/serialized_dag.py:
##########
@@ -418,6 +418,68 @@ def _generate_deadline_uuids(cls, dag_data: dict[str, 
Any]) -> dict[str, dict]:
 
         return uuid_mapping
 
+    @classmethod
+    def _try_reuse_deadline_uuids(
+        cls,
+        existing_deadline_uuids: list[str],
+        new_deadline_data: list[dict],
+        session: Session,
+    ) -> dict[str, dict] | None:
+        """
+        Try to reuse existing deadline UUIDs if the deadline definitions 
haven't changed.
+
+        Returns None if Deadline hashes are not all identical, indicating they 
need to be updated.
+
+        :param existing_deadline_uuids: List of UUID strings from existing 
serialized dag

Review Comment:
   ```suggestion
           :param existing_deadline_uuids: List of UUID strings from existing 
serialized Dag
   ```



##########
airflow-core/tests/unit/models/test_dag.py:
##########
@@ -1872,7 +1873,7 @@ def test_dagrun_deadline(self, reference_type, 
reference_column, testing_dag_bun
         assert dr.deadlines[0].deadline_time == getattr(dr, reference_column, 
DEFAULT_DATE) + interval
 
     def test_dag_with_multiple_deadlines(self, testing_dag_bundle, session):
-        """Test that a DAG with multiple deadlines stores all deadlines in the 
database."""
+        """Test that a DAG with multiple deadlines stores all deadlines and 
persists on re-serialization."""

Review Comment:
   ```suggestion
           """Test that a Dag with multiple deadlines stores all deadlines and 
persists on re-serialization."""
   ```



##########
airflow-core/tests/unit/models/test_dag.py:
##########
@@ -1899,6 +1901,28 @@ def test_dag_with_multiple_deadlines(self, 
testing_dag_bundle, session):
 
         scheduler_dag = sync_dag_to_db(dag, session=session)
 
+        deadline_alerts = session.scalars(select(DeadlineAlertModel)).all()
+        assert len(deadline_alerts) == expected_num_deadlines
+        initial_uuids = {alert.id for alert in deadline_alerts}
+
+        # Re-serialize the DAG

Review Comment:
   ```suggestion
           # Re-serialize the Dag
   ```



##########
airflow-core/tests/unit/models/test_serialized_dag.py:
##########
@@ -753,3 +763,48 @@ def test_write_dag_atomicity_on_dagcode_failure(self, 
dag_maker, session):
                 assert len(sdag.dag.task_dict) == 1, (
                     "SerializedDagModel should not be updated when write fails"
                 )
+
+    def test_deadline_interval_change_triggers_new_serdag(self, 
testing_dag_bundle, session):
+        dag_id = "test_interval_change"
+
+        # Create a new Dag with a deadline and create a dagrun as a baseline..

Review Comment:
   ```suggestion
           # Create a new Dag with a deadline and create a dagrun as a baseline.
   ```



##########
airflow-core/src/airflow/models/serialized_dag.py:
##########
@@ -503,9 +565,20 @@ def write_dag(
                 and existing_serialized_dag.data
                 and (existing_deadline_uuids := 
existing_serialized_dag.data.get("dag", {}).get("deadline"))
             ):
-                dag.data["dag"]["deadline"] = existing_deadline_uuids
-                deadline_uuid_mapping = {}
+                deadline_uuid_mapping = cls._try_reuse_deadline_uuids(
+                    existing_deadline_uuids,
+                    dag.data["dag"]["deadline"],
+                    session,
+                )
+
+                if deadline_uuid_mapping is not None:
+                    # All deadlines matched, reuse the UUIDs to preserve hash.
+                    dag.data["dag"]["deadline"] = 
list(deadline_uuid_mapping.keys())

Review Comment:
   if all matched, why don't we just use `existing_deadline_uuids`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to