Hi everyone,

Proposing we extend Deadline Alerts to individual tasks, not just DAG runs.

## Why

The legacy SLA system was per-task. Deadline Alerts (AIP-86) only support 
per-DAG-run. Users migrating from SLAs lose the ability to say "alert me if 
this task runs too long" without killing it using `execution_timeout`.

## What It Looks Like

Same `DeadlineAlert`, just on the operator instead of the DAG:

```python
PythonOperator(
    task_id="validate_data",
    python_callable=validate_func,
    deadline=DeadlineAlert(
        reference=DeadlineReference.TASK_START_DATE,
        interval=timedelta(minutes=5),
        callback=AsyncCallback(SlackWebhookNotifier, kwargs={"text": "Task 
running long!"}),
    ),
)
```

Four new reference types (task-level analogs of the existing DAG-run 
references):

 Reference                                           Based on                   
                                         Use case
  ────────────────────────  ───────────────────────────────────  
──────────────────
  TASK_START_DATE                             TaskInstance.start_date           
                        Execution duration
  TASK_QUEUED_DATE                         TaskInstance.queued_dttm             
                 Queue starvation
  TASK_LOGICAL_DATE                         DagRun.logical_date                 
                        Legacy SLA migration
  TASK_AVERAGE_RUNTIME                Historical TaskInstance.duration          
           Anomaly detection


Multiple deadlines passed as a list per task supported, same as DAG-level.

## Implementation

Straightforward extension of the existing system:

- Add `deadline` param to `BaseOperator`, serialize with the task definition
- Add `task_instance_id` FK and `resource_type` discriminator to the `deadline` 
table
- Create deadline rows on TaskInstance state transitions (QUEUED, RUNNING)
- Prune on terminal states, same pattern as `Deadline.prune_deadlines`
- Scheduler's existing `_check_and_handle_missed_deadlines` loop handles them 
with no changes

The `resource_type` column also sets up a clean path for asset-level deadlines 
later (just another column linking to the asset table + new reference types).

My plan is to prototype starting with TASK_START_DATE as the simplest reference 
type.

## References

  - AIP-86: 
https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-86+Deadline+Alerts
  - Deadline Alerts tracking issue: 
https://github.com/apache/airflow/issues/62887

Reply via email to