Nataneljpwd commented on issue #49508:
URL: https://github.com/apache/airflow/issues/49508#issuecomment-2877636652
I think I agree with what @Asquator mentioned, using windowing will solve
this issue and fix most of the edge cases where dags are starved along with
tasks which are starved or stuck on queued, Windowing could be a very good
approach that will be able to go over all edge cases by each time selecting
`max_tis` tasks to schedule if possible and if not, select the largest amount
of tasks to schedule, however it might increase the time it takes to run the
query, however, might not necessarily increase the duration in the critical
section as we will send less queries to the database when encountering starving
or tasks that we cannot schedule.
I think that if written in sql (a rough draft, might contain mistakes) , the
windowing will look something like this:
```sql
select *,
sum(task_instance.pool_slots) over (
partition by pool order by (-priority_weight /* to get tasks by priority
*/, updated_at /* to favor tasks which are longer in queue */, map_index /* to
allow for mapped tasks as well to run in order */)
) as slots_taken
count(task_instance.id) over (
partition by dag_run.id order by(/*same as before*/)
) as tasks_per_dag,
(select count(task_instance.id) where task_instance.state="running" and
dag_run.dag_id = task_instance.dag_id) as tasks_per_dag,
where slots_taken <= pool.slots and
tasks_per_dag <= dag.max_active_tasks - running_tasks_per_dag and
task_instance.state="SCHEDULED" /*and
max_ti_per_task_per_dagrun <= dag.max_ti_per_task_per_dagrun -- needs to be
added to the database
*/
from dag join dag_run on dag_id
join task_instance on dag_id
join slot_pool as pool on task_instance.pool = pool.pool
group by dag_run
limit {{max_tis}}
```
This query is untested, and heavily inspired by the query @Asquator wrote in
#49160 with small calculation fixes.
For this to work, a configuration needs to be added to the airflow db to the
`dag` model called `max_active_tis_per_dagrun` which exists after #29094 and
does not exist in the database model, after that the query can be complete.
Another quicker solution which could work is to change the prioritization to
be probability based rather than strictly larger first, for example, get all
priorities and give top N priorities at least 1 slot and then by relative
weight distribute the task instances or give the top priority x slots, the next
in line x / 2 + leftover and so on.
The second solution could be implemented using counts and windows over
priority as well, which will not make the query too heavy however might
increase the query time, we could also think of a simple mathematical algorithm
to do it for any fixed amount of priorities and implement it in sql while
having almost no impact on performance.
Just changing the sql query for the scheduler might work for most cases,
however there might still be cases where starvation occues, the second solution
should fix starvartion of dags and tasks better but it might not maximize the
performance of the scheduled to queued loop.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]