ferruzzi commented on code in PR #61372:
URL: https://github.com/apache/airflow/pull/61372#discussion_r2776565559


##########
airflow-core/tests/unit/models/test_taskinstance.py:
##########
@@ -3088,3 +3093,90 @@ def 
test_when_dag_run_has_partition_and_downstreams_listening_then_tables_popula
     assert pakl.asset_partition_dag_run_id == apdr.id
     assert pakl.source_partition_key == "abc123"
     assert pakl.target_dag_id == "asset_event_listener"
+
+
+async def empty_callback_for_deadline():
+    """Used in deadline tests to confirm that Deadlines and DeadlineAlerts 
function correctly."""
+    pass
+
+
+def test_clear_task_instances_recalculates_dagrun_queued_deadlines(dag_maker, 
session):
+    """Test that clearing tasks recalculates all (and only) DAGRUN_QUEUED_AT 
deadlines."""
+    with dag_maker(
+        dag_id="test_recalculate_deadlines",
+        schedule=datetime.timedelta(days=1),
+    ) as dag:
+        EmptyOperator(task_id="task_1")
+
+    dag_run = dag_maker.create_dagrun()
+    # Set the task to SUCCESS state
+    ti = dag_run.get_task_instance("task_1", session=session)
+    ti.set_state(TaskInstanceState.SUCCESS, session=session)
+
+    original_queued_at = timezone.utcnow() - datetime.timedelta(hours=2)
+    dag_run.queued_at = original_queued_at
+    session.flush()
+
+    serialized_dag_id = session.scalar(
+        select(SerializedDagModel.id).where(SerializedDagModel.dag_id == 
dag.dag_id)
+    )
+
+    deadline_configs = [
+        (DeadlineReference.DAGRUN_QUEUED_AT, datetime.timedelta(hours=1)),
+        (DeadlineReference.DAGRUN_QUEUED_AT, datetime.timedelta(hours=2)),
+        (DeadlineReference.FIXED_DATETIME, datetime.timedelta(hours=1)),
+    ]
+
+    for deadline_type, interval in deadline_configs:
+        if deadline_type == DeadlineReference.DAGRUN_QUEUED_AT:
+            reference = 
DeadlineReference.DAGRUN_QUEUED_AT.serialize_reference()
+            deadline_time = dag_run.queued_at + interval
+        else:  # FIXED_DATETIME
+            future_date = timezone.utcnow() + datetime.timedelta(days=7)
+            reference = 
DeadlineReference.FIXED_DATETIME(future_date).serialize_reference()
+            deadline_time = future_date + interval
+
+        deadline_alert = DeadlineAlertModel(
+            serialized_dag_id=serialized_dag_id,
+            reference=reference,
+            interval=interval.total_seconds(),
+            callback_def={"path": f"{__name__}.empty_callback_for_deadline", 
"kwargs": {}},
+        )
+        session.add(deadline_alert)
+        session.flush()
+
+        deadline = Deadline(
+            dagrun_id=dag_run.id,
+            deadline_alert_id=deadline_alert.id,
+            deadline_time=deadline_time,
+            callback=AsyncCallback(empty_callback_for_deadline),
+            dag_id=dag_run.dag_id,
+        )
+        session.add(deadline)
+
+    session.flush()

Review Comment:
   See above.   They are each needed in order to assemble the building blocks.
   
   - Flush 1: Establishes test setup
   - Flush 2 (the one in the loop): Gets auto-generated IDs for foreign keys
   - Flush 3: Makes inserted rows visible for the query on line 3159
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to