SakshamSinghal20 commented on code in PR #61769:
URL: https://github.com/apache/airflow/pull/61769#discussion_r2837219667
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -183,15 +183,22 @@ def load(self, session: Session) -> None:
self.dag_run_active_tasks_map.clear()
self.task_concurrency_map.clear()
self.task_dagrun_concurrency_map.clear()
+ # Use one grouped query on TASK_CONCURRENCY_EXECUTION_STATES to
exclude DEFERRED from dag_run_active_tasks_map
+ #while still counting it for task-level limits.
query = session.execute(
- select(TI.dag_id, TI.task_id, TI.run_id, func.count("*"))
- .where(TI.state.in_(EXECUTION_STATES))
- .group_by(TI.task_id, TI.run_id, TI.dag_id)
+ select(TI.dag_id, TI.task_id, TI.run_id, TI.state, func.count("*"))
+ .where(TI.state.in_(TASK_CONCURRENCY_EXECUTION_STATES))
+ .group_by(TI.dag_id, TI.task_id, TI.run_id, TI.state)
)
- for dag_id, task_id, run_id, c in query:
- self.dag_run_active_tasks_map[dag_id, run_id] += c
- self.task_concurrency_map[(dag_id, task_id)] += c
- self.task_dagrun_concurrency_map[(dag_id, run_id, task_id)] += c
+ for dag_id, task_id, run_id, state, count in query:
+ # Always count towards task-level concurrency
(max_active_tis_per_dag /
+ # max_active_tis_per_dagrun), including DEFERRED.
+ self.task_concurrency_map[(dag_id, task_id)] += count
+ self.task_dagrun_concurrency_map[(dag_id, run_id, task_id)] +=
count
+ # Only count non-deferred states towards DAG-run active tasks
+ # (max_active_tasks / worker slot accounting).
+ if state != TaskInstanceState.DEFERRED:
+ self.dag_run_active_tasks_map[dag_id, run_id] += count
Review Comment:
As @Nataneljpwd mentioned, `max_active_tasks` is primarily about limiting
how many executor slots a DAG run can take up. Because a `DEFERRED` task
releases its worker slot to the Triggerer, excluding it from this limit ensures
that a DAG doesn't needlessly block other tasks from being scheduled while it
waits on external events.
However, task level limits like `max_active_tis_per_dag` and
`max_active_tis_per_dagrun` are typically configured by users to protect
external resources. Even though a task is `DEFERRED` internally, it is still
logically "active" against that external system. If we exclude `DEFERRED` tasks
from these limits, Airflow could queue up hundreds of them simultaneously,
completely bypassing the user's intended safeguard and overwhelming the
downstream service.
open up for advive and changes.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]