kaxil opened a new pull request, #60951:
URL: https://github.com/apache/airflow/pull/60951

   Optimizes `RenderedTaskInstanceFields.delete_old_records()` to avoid 
scanning the RTIF table when determining which records to keep. This fixes 
scheduler crashes caused by slow RTIF cleanup queries on DAGs with many 
dynamically mapped tasks.
   
   **Problem**: For DAGs with 3k+ mapped task instances, the delete query was 
scanning ~100k RTIF records with a complex `NOT EXISTS` subquery containing a 
join, causing statement timeouts and high CPU utilization that prevented the 
scheduler from heartbeating.
   
   This PR simplifies the query to only use the `dag_run` table for finding 
recent run_ids, avoiding the expensive RTIF table scan entirely.
   
   ---
   
   ### Before (slow - scans RTIF table with join)
   
   ```sql
   DELETE FROM rendered_task_instance_fields 
   WHERE dag_id = :dag_id AND task_id = :task_id 
   AND NOT EXISTS (
     SELECT 1 FROM (
       SELECT DISTINCT rtif.dag_id, rtif.task_id, rtif.run_id, dr.logical_date 
       FROM rendered_task_instance_fields rtif
       JOIN dag_run dr ON rtif.dag_id = dr.dag_id AND rtif.run_id = dr.run_id 
       WHERE rtif.dag_id = :dag_id AND rtif.task_id = :task_id 
       ORDER BY dr.logical_date DESC LIMIT 30
     ) AS keep
     WHERE keep.dag_id = rtif.dag_id AND keep.task_id = rtif.task_id AND 
keep.run_id = rtif.run_id
   )
   ```
   
   ### After (fast - only queries dag_run table)
   
   ```sql
   DELETE FROM rendered_task_instance_fields 
   WHERE dag_id = :dag_id AND task_id = :task_id 
   AND run_id NOT IN (
     SELECT run_id FROM dag_run 
     WHERE dag_id = :dag_id 
     ORDER BY run_after DESC LIMIT 30
   )
   ```
   
   ## Benchmark Results (PostgreSQL)
   
   **Test Setup**: 100 DAG runs x 50 mapped TIs = 5,000 RTIF records, keeping 
30 most recent runs
   
   | Metric | Old Query | New Query | Improvement |
   |--------|-----------|-----------|-------------|
   | Execution Time | 233 ms | 5.5 ms | **42x faster** |
   | Buffer Hits | 14,618 | 113 | 129x fewer |
   
   ### Query Plan Comparison
   
   **Old Query** - Nested loop with expensive heap fetches:
   ```
   Delete on rendered_task_instance_fields (actual time=233.12..233.12 rows=0 
loops=1)
     Buffers: shared hit=14618
     ->  Nested Loop Anti Join (actual time=51.43..232.91 rows=3500 loops=1)
           Rows Removed by Join Filter: 126750
           ->  Index Scan using rendered_task_instance_fields_pkey ...
           ->  Subquery Scan on anon_1 (actual time=0.010..0.065 rows=26 
loops=5000)
                 ->  Nested Loop (joins RTIF to dag_run)
                       Rows Removed by Join Filter: 495000
   Execution Time: 233.44 ms
   ```
   
   **New Query** - Simple index scan with hashed subplan:
   ```
   Delete on rendered_task_instance_fields (actual time=5.12..5.12 rows=0 
loops=1)
     Buffers: shared hit=113
     ->  Index Scan using rendered_task_instance_fields_pkey ...
           Filter: (NOT (hashed SubPlan 1))
           SubPlan 1
             ->  Limit (cost=0.14..8.15 rows=1 width=524)
                   ->  Index Scan Backward using idx_dag_run_run_after on 
dag_run
   Execution Time: 5.55 ms
   ```
   
   ## Behavioral Change
   
   The semantics change slightly:
   
   | Scenario | Old Behavior | New Behavior |
   |----------|-------------|--------------|
   | Normal task (runs every dag run) | Keep last 30 runs | Keep last 30 runs |
   | Sparse task (runs occasionally) | Keep last 30 executions | Keep records 
from last 30 dag runs |
   
   This imo is acceptable because of the following, but looking forward for 
what others think too:
   1. Most tasks run every DAG run (not sparse)
   2. The purpose is to limit storage, not preserve specific historical records
   3. The performance improvement (42x) far outweighs this edge case
   
   ## Additional Changes
   
   - Uses `run_after` instead of `logical_date` for ordering (since 
`logical_date` can be NULL for manual runs)
   - Updated docstrings, config description, and documentation
   
   ## How to Reproduce
   
   To reproduce the benchmark, save this script and run inside breeze with 
PostgreSQL:
   
   ```bash
   breeze run --backend postgres python dev/rtif_benchmark.py
   ```
   
   <details>
   <summary>Benchmark Script</summary>
   
   ```python
   """
   Benchmark: RTIF delete_old_records query performance.
   Run: breeze run --backend postgres python dev/rtif_benchmark.py
   """
   from __future__ import annotations
   
   import time
   from datetime import datetime, timedelta, timezone
   
   from sqlalchemy import delete, exists, select, text
   
   from airflow.models.dagrun import DagRun
   from airflow.models.renderedtifields import RenderedTaskInstanceFields as 
RTIF
   from airflow.utils.session import create_session
   from airflow.utils.state import DagRunState
   
   DAG_ID = "benchmark_dag"
   TASK_ID = "benchmark_task"
   NUM_RUNS = 100
   MAP_INDEX_COUNT = 50
   NUM_TO_KEEP = 30
   
   
   def setup_test_data():
       from airflow.utils.db import initdb
       initdb()
   
       with create_session() as session:
           session.execute(text(f"DELETE FROM rendered_task_instance_fields 
WHERE dag_id = '{DAG_ID}'"))
           session.execute(text(f"DELETE FROM dag_run WHERE dag_id = 
'{DAG_ID}'"))
           session.execute(text("ALTER TABLE rendered_task_instance_fields DROP 
CONSTRAINT IF EXISTS rtif_ti_fkey"))
           session.commit()
   
           base_date = datetime(2024, 1, 1, tzinfo=timezone.utc)
           for run_num in range(NUM_RUNS):
               run_id = f"run_{run_num:04d}"
               logical_date = base_date + timedelta(days=run_num)
               dr = DagRun(dag_id=DAG_ID, run_id=run_id, 
logical_date=logical_date,
                           data_interval=(logical_date, logical_date + 
timedelta(days=1)),
                           run_type="scheduled", state=DagRunState.SUCCESS)
               session.add(dr)
               session.flush()
   
               for map_idx in range(MAP_INDEX_COUNT):
                   session.execute(text("""
                       INSERT INTO rendered_task_instance_fields 
                       (dag_id, task_id, run_id, map_index, rendered_fields)
                       VALUES (:dag_id, :task_id, :run_id, :map_index, :fields)
                   """), {"dag_id": DAG_ID, "task_id": TASK_ID, "run_id": 
run_id, 
                          "map_index": map_idx, "fields": '{"cmd": "echo 
test"}'})
           session.commit()
       print(f"Created {NUM_RUNS} runs x {MAP_INDEX_COUNT} TIs = {NUM_RUNS * 
MAP_INDEX_COUNT} RTIF records")
   
   
   def old_query():
       """Old query with NOT EXISTS and JOIN."""
       tis_to_keep = (
           select(RTIF.dag_id, RTIF.task_id, RTIF.run_id, DagRun.logical_date)
           .where(RTIF.dag_id == DAG_ID, RTIF.task_id == TASK_ID)
           .join(RTIF.dag_run).distinct()
           .order_by(DagRun.logical_date.desc()).limit(NUM_TO_KEEP)
       ).subquery()
       return delete(RTIF).where(
           RTIF.dag_id == DAG_ID, RTIF.task_id == TASK_ID,
           ~exists(1).where(tis_to_keep.c.dag_id == RTIF.dag_id, 
                            tis_to_keep.c.task_id == RTIF.task_id, 
                            tis_to_keep.c.run_id == RTIF.run_id))
   
   
   def new_query():
       """New query with NOT IN on dag_run only."""
       run_ids = select(DagRun.run_id).where(DagRun.dag_id == DAG_ID).order_by(
           DagRun.run_after.desc()).limit(NUM_TO_KEEP)
       return delete(RTIF).where(
           RTIF.dag_id == DAG_ID, RTIF.task_id == TASK_ID,
           RTIF.run_id.not_in(run_ids.scalar_subquery()))
   
   
   def benchmark(session, stmt, label):
       start = time.perf_counter()
       result = session.execute(stmt)
       elapsed = time.perf_counter() - start
       print(f"{label}: {result.rowcount} rows in {elapsed*1000:.2f} ms")
       return elapsed
   
   
   def main():
       setup_test_data()
       with create_session() as session:
           old_time = benchmark(session, old_query(), "OLD")
           session.commit()
   
       setup_test_data()
       with create_session() as session:
           new_time = benchmark(session, new_query(), "NEW")
           session.commit()
   
       print(f"\nSpeedup: {old_time/new_time:.1f}x faster")
   
   
   if __name__ == "__main__":
       main()
   ```
   
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to