o-nikolas commented on code in PR #61372:
URL: https://github.com/apache/airflow/pull/61372#discussion_r2776285471


##########
airflow-core/src/airflow/models/taskinstance.py:
##########
@@ -181,6 +183,48 @@ def _stop_remaining_tasks(*, task_instance: TaskInstance, 
task_teardown_map=None
             log.info("Not skipping teardown task '%s'", ti.task_id)
 
 
+def _recalculate_dagrun_queued_at_deadlines(
+    dagrun: DagRun, new_queued_at: datetime, session: Session
+) -> None:
+    """
+    Recalculate deadline times for deadlines that reference dagrun.queued_at.
+
+    :param dagrun: The DagRun whose deadlines should be recalculated
+    :param new_queued_at: The new queued_at timestamp to use for calculation
+    :param session: Database session
+
+    :meta private:
+    """
+    results = session.execute(
+        select(Deadline, DeadlineAlertModel)
+        .join(DeadlineAlertModel, Deadline.deadline_alert_id == 
DeadlineAlertModel.id)
+        .where(
+            Deadline.dagrun_id == dagrun.id,
+            Deadline.missed == false(),
+            
DeadlineAlertModel.reference[ReferenceModels.REFERENCE_TYPE_FIELD].as_string()
+            == ReferenceModels.DagRunQueuedAtDeadline.__name__,
+        )
+    ).all()
+
+    if not results:
+        return
+
+    for deadline, deadline_alert in results:
+        # We can't use evaluate_with() since the new queued_at is not written 
to the DB yet.
+        deadline_interval = timedelta(seconds=deadline_alert.interval)
+        new_deadline_time = new_queued_at + deadline_interval
+
+        log.info(

Review Comment:
   Don't feel strongly, just suggesting:
   
   
   ```suggestion
           log.debug(
   ```



##########
airflow-core/src/airflow/models/taskinstance.py:
##########
@@ -274,6 +318,8 @@ def clear_task_instances(
             dr.clear_number += 1
             dr.queued_at = timezone.utcnow()
 
+            _recalculate_dagrun_queued_at_deadlines(dr, dr.queued_at, session)

Review Comment:
   I'm not familiar with the clearing logic/code. It looks like this thing is 
inside `clear_task_instances` is there another function for clearing the entire 
dag run that this should live in?



##########
airflow-core/tests/unit/models/test_taskinstance.py:
##########
@@ -3088,3 +3093,90 @@ def 
test_when_dag_run_has_partition_and_downstreams_listening_then_tables_popula
     assert pakl.asset_partition_dag_run_id == apdr.id
     assert pakl.source_partition_key == "abc123"
     assert pakl.target_dag_id == "asset_event_listener"
+
+
+async def empty_callback_for_deadline():
+    """Used in deadline tests to confirm that Deadlines and DeadlineAlerts 
function correctly."""
+    pass
+
+
+def test_clear_task_instances_recalculates_dagrun_queued_deadlines(dag_maker, 
session):
+    """Test that clearing tasks recalculates all (and only) DAGRUN_QUEUED_AT 
deadlines."""
+    with dag_maker(
+        dag_id="test_recalculate_deadlines",
+        schedule=datetime.timedelta(days=1),
+    ) as dag:
+        EmptyOperator(task_id="task_1")
+
+    dag_run = dag_maker.create_dagrun()
+    # Set the task to SUCCESS state
+    ti = dag_run.get_task_instance("task_1", session=session)
+    ti.set_state(TaskInstanceState.SUCCESS, session=session)
+
+    original_queued_at = timezone.utcnow() - datetime.timedelta(hours=2)
+    dag_run.queued_at = original_queued_at
+    session.flush()
+
+    serialized_dag_id = session.scalar(
+        select(SerializedDagModel.id).where(SerializedDagModel.dag_id == 
dag.dag_id)
+    )
+
+    deadline_configs = [
+        (DeadlineReference.DAGRUN_QUEUED_AT, datetime.timedelta(hours=1)),
+        (DeadlineReference.DAGRUN_QUEUED_AT, datetime.timedelta(hours=2)),
+        (DeadlineReference.FIXED_DATETIME, datetime.timedelta(hours=1)),
+    ]
+
+    for deadline_type, interval in deadline_configs:
+        if deadline_type == DeadlineReference.DAGRUN_QUEUED_AT:
+            reference = 
DeadlineReference.DAGRUN_QUEUED_AT.serialize_reference()
+            deadline_time = dag_run.queued_at + interval
+        else:  # FIXED_DATETIME
+            future_date = timezone.utcnow() + datetime.timedelta(days=7)
+            reference = 
DeadlineReference.FIXED_DATETIME(future_date).serialize_reference()
+            deadline_time = future_date + interval
+
+        deadline_alert = DeadlineAlertModel(
+            serialized_dag_id=serialized_dag_id,
+            reference=reference,
+            interval=interval.total_seconds(),
+            callback_def={"path": f"{__name__}.empty_callback_for_deadline", 
"kwargs": {}},
+        )
+        session.add(deadline_alert)
+        session.flush()
+
+        deadline = Deadline(
+            dagrun_id=dag_run.id,
+            deadline_alert_id=deadline_alert.id,
+            deadline_time=deadline_time,
+            callback=AsyncCallback(empty_callback_for_deadline),
+            dag_id=dag_run.dag_id,
+        )
+        session.add(deadline)
+
+    session.flush()

Review Comment:
   This is the third flush call so far for the setup. Can we have just this one 
at the end? Or does some of the other setup logic above depend on making DB 
queries?



##########
airflow-core/tests/unit/models/test_taskinstance.py:
##########
@@ -3088,3 +3093,90 @@ def 
test_when_dag_run_has_partition_and_downstreams_listening_then_tables_popula
     assert pakl.asset_partition_dag_run_id == apdr.id
     assert pakl.source_partition_key == "abc123"
     assert pakl.target_dag_id == "asset_event_listener"
+
+
+async def empty_callback_for_deadline():
+    """Used in deadline tests to confirm that Deadlines and DeadlineAlerts 
function correctly."""
+    pass
+
+
+def test_clear_task_instances_recalculates_dagrun_queued_deadlines(dag_maker, 
session):
+    """Test that clearing tasks recalculates all (and only) DAGRUN_QUEUED_AT 
deadlines."""
+    with dag_maker(
+        dag_id="test_recalculate_deadlines",
+        schedule=datetime.timedelta(days=1),
+    ) as dag:
+        EmptyOperator(task_id="task_1")
+
+    dag_run = dag_maker.create_dagrun()
+    # Set the task to SUCCESS state
+    ti = dag_run.get_task_instance("task_1", session=session)
+    ti.set_state(TaskInstanceState.SUCCESS, session=session)
+
+    original_queued_at = timezone.utcnow() - datetime.timedelta(hours=2)
+    dag_run.queued_at = original_queued_at
+    session.flush()
+
+    serialized_dag_id = session.scalar(
+        select(SerializedDagModel.id).where(SerializedDagModel.dag_id == 
dag.dag_id)
+    )
+
+    deadline_configs = [
+        (DeadlineReference.DAGRUN_QUEUED_AT, datetime.timedelta(hours=1)),
+        (DeadlineReference.DAGRUN_QUEUED_AT, datetime.timedelta(hours=2)),
+        (DeadlineReference.FIXED_DATETIME, datetime.timedelta(hours=1)),
+    ]
+
+    for deadline_type, interval in deadline_configs:
+        if deadline_type == DeadlineReference.DAGRUN_QUEUED_AT:
+            reference = 
DeadlineReference.DAGRUN_QUEUED_AT.serialize_reference()
+            deadline_time = dag_run.queued_at + interval
+        else:  # FIXED_DATETIME
+            future_date = timezone.utcnow() + datetime.timedelta(days=7)
+            reference = 
DeadlineReference.FIXED_DATETIME(future_date).serialize_reference()
+            deadline_time = future_date + interval
+
+        deadline_alert = DeadlineAlertModel(
+            serialized_dag_id=serialized_dag_id,
+            reference=reference,
+            interval=interval.total_seconds(),
+            callback_def={"path": f"{__name__}.empty_callback_for_deadline", 
"kwargs": {}},
+        )
+        session.add(deadline_alert)
+        session.flush()

Review Comment:
   Do we need a flush each loop?



##########
airflow-core/src/airflow/models/taskinstance.py:
##########
@@ -181,6 +183,48 @@ def _stop_remaining_tasks(*, task_instance: TaskInstance, 
task_teardown_map=None
             log.info("Not skipping teardown task '%s'", ti.task_id)
 
 
+def _recalculate_dagrun_queued_at_deadlines(
+    dagrun: DagRun, new_queued_at: datetime, session: Session
+) -> None:
+    """
+    Recalculate deadline times for deadlines that reference dagrun.queued_at.
+
+    :param dagrun: The DagRun whose deadlines should be recalculated
+    :param new_queued_at: The new queued_at timestamp to use for calculation
+    :param session: Database session
+
+    :meta private:
+    """
+    results = session.execute(
+        select(Deadline, DeadlineAlertModel)
+        .join(DeadlineAlertModel, Deadline.deadline_alert_id == 
DeadlineAlertModel.id)
+        .where(
+            Deadline.dagrun_id == dagrun.id,
+            Deadline.missed == false(),
+            
DeadlineAlertModel.reference[ReferenceModels.REFERENCE_TYPE_FIELD].as_string()
+            == ReferenceModels.DagRunQueuedAtDeadline.__name__,
+        )
+    ).all()
+
+    if not results:
+        return
+
+    for deadline, deadline_alert in results:
+        # We can't use evaluate_with() since the new queued_at is not written 
to the DB yet.
+        deadline_interval = timedelta(seconds=deadline_alert.interval)
+        new_deadline_time = new_queued_at + deadline_interval
+
+        log.info(
+            "Recalculating deadline %s for DagRun %s.%s: old=%s, new=%s",
+            deadline.id,
+            dagrun.dag_id,
+            dagrun.run_id,
+            deadline.deadline_time,
+            new_deadline_time,
+        )
+        deadline.deadline_time = new_deadline_time

Review Comment:
   Suggestion: Maybe leave a comment saying that the session is (or rather, 
must be) flushed by the calling function? Looks strange just having this 
dangling.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to