kaxil opened a new pull request, #60951:
URL: https://github.com/apache/airflow/pull/60951
Optimizes `RenderedTaskInstanceFields.delete_old_records()` to avoid
scanning the RTIF table when determining which records to keep. This fixes
scheduler crashes caused by slow RTIF cleanup queries on DAGs with many
dynamically mapped tasks.
**Problem**: For DAGs with 3k+ mapped task instances, the delete query was
scanning ~100k RTIF records with a complex `NOT EXISTS` subquery containing a
join, causing statement timeouts and high CPU utilization that prevented the
scheduler from heartbeating.
This PR simplifies the query to only use the `dag_run` table for finding
recent run_ids, avoiding the expensive RTIF table scan entirely.
---
### Before (slow - scans RTIF table with join)
```sql
DELETE FROM rendered_task_instance_fields
WHERE dag_id = :dag_id AND task_id = :task_id
AND NOT EXISTS (
SELECT 1 FROM (
SELECT DISTINCT rtif.dag_id, rtif.task_id, rtif.run_id, dr.logical_date
FROM rendered_task_instance_fields rtif
JOIN dag_run dr ON rtif.dag_id = dr.dag_id AND rtif.run_id = dr.run_id
WHERE rtif.dag_id = :dag_id AND rtif.task_id = :task_id
ORDER BY dr.logical_date DESC LIMIT 30
) AS keep
WHERE keep.dag_id = rtif.dag_id AND keep.task_id = rtif.task_id AND
keep.run_id = rtif.run_id
)
```
### After (fast - only queries dag_run table)
```sql
DELETE FROM rendered_task_instance_fields
WHERE dag_id = :dag_id AND task_id = :task_id
AND run_id NOT IN (
SELECT run_id FROM dag_run
WHERE dag_id = :dag_id
ORDER BY run_after DESC LIMIT 30
)
```
## Benchmark Results (PostgreSQL)
**Test Setup**: 100 DAG runs x 50 mapped TIs = 5,000 RTIF records, keeping
30 most recent runs
| Metric | Old Query | New Query | Improvement |
|--------|-----------|-----------|-------------|
| Execution Time | 233 ms | 5.5 ms | **42x faster** |
| Buffer Hits | 14,618 | 113 | 129x fewer |
### Query Plan Comparison
**Old Query** - Nested loop with expensive heap fetches:
```
Delete on rendered_task_instance_fields (actual time=233.12..233.12 rows=0
loops=1)
Buffers: shared hit=14618
-> Nested Loop Anti Join (actual time=51.43..232.91 rows=3500 loops=1)
Rows Removed by Join Filter: 126750
-> Index Scan using rendered_task_instance_fields_pkey ...
-> Subquery Scan on anon_1 (actual time=0.010..0.065 rows=26
loops=5000)
-> Nested Loop (joins RTIF to dag_run)
Rows Removed by Join Filter: 495000
Execution Time: 233.44 ms
```
**New Query** - Simple index scan with hashed subplan:
```
Delete on rendered_task_instance_fields (actual time=5.12..5.12 rows=0
loops=1)
Buffers: shared hit=113
-> Index Scan using rendered_task_instance_fields_pkey ...
Filter: (NOT (hashed SubPlan 1))
SubPlan 1
-> Limit (cost=0.14..8.15 rows=1 width=524)
-> Index Scan Backward using idx_dag_run_run_after on
dag_run
Execution Time: 5.55 ms
```
## Behavioral Change
The semantics change slightly:
| Scenario | Old Behavior | New Behavior |
|----------|-------------|--------------|
| Normal task (runs every dag run) | Keep last 30 runs | Keep last 30 runs |
| Sparse task (runs occasionally) | Keep last 30 executions | Keep records
from last 30 dag runs |
This imo is acceptable because of the following, but looking forward for
what others think too:
1. Most tasks run every DAG run (not sparse)
2. The purpose is to limit storage, not preserve specific historical records
3. The performance improvement (42x) far outweighs this edge case
## Additional Changes
- Uses `run_after` instead of `logical_date` for ordering (since
`logical_date` can be NULL for manual runs)
- Updated docstrings, config description, and documentation
## How to Reproduce
To reproduce the benchmark, save this script and run inside breeze with
PostgreSQL:
```bash
breeze run --backend postgres python dev/rtif_benchmark.py
```
<details>
<summary>Benchmark Script</summary>
```python
"""
Benchmark: RTIF delete_old_records query performance.
Run: breeze run --backend postgres python dev/rtif_benchmark.py
"""
from __future__ import annotations
import time
from datetime import datetime, timedelta, timezone
from sqlalchemy import delete, exists, select, text
from airflow.models.dagrun import DagRun
from airflow.models.renderedtifields import RenderedTaskInstanceFields as
RTIF
from airflow.utils.session import create_session
from airflow.utils.state import DagRunState
DAG_ID = "benchmark_dag"
TASK_ID = "benchmark_task"
NUM_RUNS = 100
MAP_INDEX_COUNT = 50
NUM_TO_KEEP = 30
def setup_test_data():
from airflow.utils.db import initdb
initdb()
with create_session() as session:
session.execute(text(f"DELETE FROM rendered_task_instance_fields
WHERE dag_id = '{DAG_ID}'"))
session.execute(text(f"DELETE FROM dag_run WHERE dag_id =
'{DAG_ID}'"))
session.execute(text("ALTER TABLE rendered_task_instance_fields DROP
CONSTRAINT IF EXISTS rtif_ti_fkey"))
session.commit()
base_date = datetime(2024, 1, 1, tzinfo=timezone.utc)
for run_num in range(NUM_RUNS):
run_id = f"run_{run_num:04d}"
logical_date = base_date + timedelta(days=run_num)
dr = DagRun(dag_id=DAG_ID, run_id=run_id,
logical_date=logical_date,
data_interval=(logical_date, logical_date +
timedelta(days=1)),
run_type="scheduled", state=DagRunState.SUCCESS)
session.add(dr)
session.flush()
for map_idx in range(MAP_INDEX_COUNT):
session.execute(text("""
INSERT INTO rendered_task_instance_fields
(dag_id, task_id, run_id, map_index, rendered_fields)
VALUES (:dag_id, :task_id, :run_id, :map_index, :fields)
"""), {"dag_id": DAG_ID, "task_id": TASK_ID, "run_id":
run_id,
"map_index": map_idx, "fields": '{"cmd": "echo
test"}'})
session.commit()
print(f"Created {NUM_RUNS} runs x {MAP_INDEX_COUNT} TIs = {NUM_RUNS *
MAP_INDEX_COUNT} RTIF records")
def old_query():
"""Old query with NOT EXISTS and JOIN."""
tis_to_keep = (
select(RTIF.dag_id, RTIF.task_id, RTIF.run_id, DagRun.logical_date)
.where(RTIF.dag_id == DAG_ID, RTIF.task_id == TASK_ID)
.join(RTIF.dag_run).distinct()
.order_by(DagRun.logical_date.desc()).limit(NUM_TO_KEEP)
).subquery()
return delete(RTIF).where(
RTIF.dag_id == DAG_ID, RTIF.task_id == TASK_ID,
~exists(1).where(tis_to_keep.c.dag_id == RTIF.dag_id,
tis_to_keep.c.task_id == RTIF.task_id,
tis_to_keep.c.run_id == RTIF.run_id))
def new_query():
"""New query with NOT IN on dag_run only."""
run_ids = select(DagRun.run_id).where(DagRun.dag_id == DAG_ID).order_by(
DagRun.run_after.desc()).limit(NUM_TO_KEEP)
return delete(RTIF).where(
RTIF.dag_id == DAG_ID, RTIF.task_id == TASK_ID,
RTIF.run_id.not_in(run_ids.scalar_subquery()))
def benchmark(session, stmt, label):
start = time.perf_counter()
result = session.execute(stmt)
elapsed = time.perf_counter() - start
print(f"{label}: {result.rowcount} rows in {elapsed*1000:.2f} ms")
return elapsed
def main():
setup_test_data()
with create_session() as session:
old_time = benchmark(session, old_query(), "OLD")
session.commit()
setup_test_data()
with create_session() as session:
new_time = benchmark(session, new_query(), "NEW")
session.commit()
print(f"\nSpeedup: {old_time/new_time:.1f}x faster")
if __name__ == "__main__":
main()
```
</details>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]