o-nikolas commented on code in PR #61372:
URL: https://github.com/apache/airflow/pull/61372#discussion_r2776577637
##########
airflow-core/tests/unit/models/test_taskinstance.py:
##########
@@ -3088,3 +3093,90 @@ def
test_when_dag_run_has_partition_and_downstreams_listening_then_tables_popula
assert pakl.asset_partition_dag_run_id == apdr.id
assert pakl.source_partition_key == "abc123"
assert pakl.target_dag_id == "asset_event_listener"
+
+
+async def empty_callback_for_deadline():
+ """Used in deadline tests to confirm that Deadlines and DeadlineAlerts
function correctly."""
+ pass
+
+
+def test_clear_task_instances_recalculates_dagrun_queued_deadlines(dag_maker,
session):
+ """Test that clearing tasks recalculates all (and only) DAGRUN_QUEUED_AT
deadlines."""
+ with dag_maker(
+ dag_id="test_recalculate_deadlines",
+ schedule=datetime.timedelta(days=1),
+ ) as dag:
+ EmptyOperator(task_id="task_1")
+
+ dag_run = dag_maker.create_dagrun()
+ # Set the task to SUCCESS state
+ ti = dag_run.get_task_instance("task_1", session=session)
+ ti.set_state(TaskInstanceState.SUCCESS, session=session)
+
+ original_queued_at = timezone.utcnow() - datetime.timedelta(hours=2)
+ dag_run.queued_at = original_queued_at
+ session.flush()
+
+ serialized_dag_id = session.scalar(
+ select(SerializedDagModel.id).where(SerializedDagModel.dag_id ==
dag.dag_id)
+ )
+
+ deadline_configs = [
+ (DeadlineReference.DAGRUN_QUEUED_AT, datetime.timedelta(hours=1)),
+ (DeadlineReference.DAGRUN_QUEUED_AT, datetime.timedelta(hours=2)),
+ (DeadlineReference.FIXED_DATETIME, datetime.timedelta(hours=1)),
+ ]
+
+ for deadline_type, interval in deadline_configs:
+ if deadline_type == DeadlineReference.DAGRUN_QUEUED_AT:
+ reference =
DeadlineReference.DAGRUN_QUEUED_AT.serialize_reference()
+ deadline_time = dag_run.queued_at + interval
+ else: # FIXED_DATETIME
+ future_date = timezone.utcnow() + datetime.timedelta(days=7)
+ reference =
DeadlineReference.FIXED_DATETIME(future_date).serialize_reference()
+ deadline_time = future_date + interval
+
+ deadline_alert = DeadlineAlertModel(
+ serialized_dag_id=serialized_dag_id,
+ reference=reference,
+ interval=interval.total_seconds(),
+ callback_def={"path": f"{__name__}.empty_callback_for_deadline",
"kwargs": {}},
+ )
+ session.add(deadline_alert)
+ session.flush()
+
+ deadline = Deadline(
+ dagrun_id=dag_run.id,
+ deadline_alert_id=deadline_alert.id,
+ deadline_time=deadline_time,
+ callback=AsyncCallback(empty_callback_for_deadline),
+ dag_id=dag_run.dag_id,
+ )
+ session.add(deadline)
+
+ session.flush()
Review Comment:
Fair enough, expensive test, but it's only one I suppose!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]