GitHub user Asquator edited a comment on the discussion: Redesign the scheduler 
logic to avoid starvation due to dropped tasks in critical section

Yep, so far the most promising and scalable approach is "window over 
everything" which gives us granularity at the level of individual scheduling 
entities, or objects that have parametrized limits taken into account - these 
are DAG runs and pools. We apparently have to sort the tasks by the policy 
defined fields, partition by combinations of limit holder fields, and try to 
stuff as many tasks as we can in each such window.   

We have to consider:
`pool_slots` (per pool)
`max_active_tis_per_dag` (per DAG), 
`max_active_tasks` (per DAG run),  
`max_active_tis_per_dagrun` (per DAG run)

Here's a pseudo query I wrote to demonstrate this idea:
  
```
select *,

sum(ti.pool_slots) over (
        partition by pool
        order by (-prw, date, idx)
        between all preceding and current
) as slots_taken,

count(ti) over (
        partition by dr.id
        order by (-prw, date, idx)
        between all preceding and current
) as dagrun_total_active_ti,

count(ti) over (
        partition by dag.dag_id, ti.task_id
        order by (-prw, date, idx)
        between all preceding and current
) as dag_ti_count,

count(ti) over (
        partition by dr.id, ti.task_id
        order by (-prw, date, idx)
        between all preceding and current
) as dagrun_ti_count,

from task_instance ti 
join dag_run dr on ti.run_id = dr.run_id
join dag on dr.dag_id = dag.id
join slot_pool as pools on ti.pool = slot_pool.pool

where
slots_taken <= pools.slots and
dagrun_total_active_ti <= dag.max_active_tasks and
dag_ti_count <= ??? max_active_tis_per_dag ??? and 
dagrun_ti_count <= ??? max_active_tis_per_dagrun ???

limit max_tis
```

The ??? means the field is not in the DB, it's currently computed manually into 
a concurrency map which complicates the matters. Mb it's worth to store these 
as temporary tables instead. 

I hope I got the right notion of the data models since the fields are vaguely 
documented and I relied solely on their names, feel free to correct me if there 
are mistakes.

GitHub link: 
https://github.com/apache/airflow/discussions/49160#discussioncomment-12886468

----
This is an automatically sent email for commits@airflow.apache.org.
To unsubscribe, please send an email to: commits-unsubscr...@airflow.apache.org

Reply via email to