prince8273 opened a new pull request, #67086:
URL: https://github.com/apache/airflow/pull/67086

   ## What
   
   Three deps in the scheduler loop — `DagUnpausedDep`, 
`DagTISlotsAvailableDep`,
   and `PoolSlotsAvailableDep` — each hit the DB once or twice per schedulable 
task
   instance, unconditionally. With 500 TIs that's ~2000 queries per scheduling 
loop
   before anything actually runs.
   
   Pre-fetches the data those three deps need in 3-4 IN-clause queries before 
the
   loop. Each dep reads from the cache with a direct fallback to the original 
query
   if the cache isn't populated.
   
   Pattern already exists in `DepContext` — `ensure_finished_tis()`, used by
   `TriggerRuleDep`. Extended it.
   
   ## Numbers
   
   | N (schedulable TIs) | Before | After |
   |---|---|---|
   | 50 | ~200 queries | 3-4 |
   | 200 | ~800 queries | 3-4 |
   | 500 | ~2000 queries | 3-4 |
   
   Before is analytical: `DagUnpausedDep` issues 1 query/TI 
(`dag_unpaused_dep.py:39`),
   `DagTISlotsAvailableDep` issues 1 (`definitions/dag.py:486`), 
`PoolSlotsAvailableDep`
   issues 2 via `pool.open_slots()` → `occupied_slots()` (`pool.py:255`).
   
   ## Implementation notes
   
   Cache fields on `DepContext` are `init=True` deliberately. 
`are_dependencies_met()`
   calls `attrs.evolve()` for `UP_FOR_RESCHEDULE` tasks 
(`taskinstance.py:1090`) and
   `init=False` fields get silently dropped.
   
   Occupied-slot prefetch splits on `pool.include_deferred` to match what
   `pool.get_occupied_states()` returns exactly (`pool.py:275`). Unlimited pools
   (`slots == -1`) still get `float("inf")`.
   
   Empty `schedulable_tis` hits the early return at line 1439 before the 
prefetch
   block — no empty `IN ()` reaches the DB.
   
   Every cache read has a direct `else` with the original query, so anything 
calling
   these deps outside the scheduler loop (tests, sensors, `LocalTaskJob`) is 
unaffected.
   
   ## Files
   
   - `dep_context.py` — 4 cache fields
   - `dagrun.py` — `EXECUTION_STATES` import + prefetch block in 
`_get_ready_tis()`
   - `dag_unpaused_dep.py`, `dag_ti_slots_available_dep.py`, 
`pool_slots_available_dep.py`
     — cache read + fallback in each
   
   Also addresses the batch-dep coupling TODO in `ti_deps/dependencies_deps.py`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to