SakshamSinghal20 commented on code in PR #61769:
URL: https://github.com/apache/airflow/pull/61769#discussion_r2837219667


##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -183,15 +183,22 @@ def load(self, session: Session) -> None:
         self.dag_run_active_tasks_map.clear()
         self.task_concurrency_map.clear()
         self.task_dagrun_concurrency_map.clear()
+        # Use one grouped query on TASK_CONCURRENCY_EXECUTION_STATES to 
exclude DEFERRED from dag_run_active_tasks_map 
+        #while still counting it for task-level limits.
         query = session.execute(
-            select(TI.dag_id, TI.task_id, TI.run_id, func.count("*"))
-            .where(TI.state.in_(EXECUTION_STATES))
-            .group_by(TI.task_id, TI.run_id, TI.dag_id)
+            select(TI.dag_id, TI.task_id, TI.run_id, TI.state, func.count("*"))
+            .where(TI.state.in_(TASK_CONCURRENCY_EXECUTION_STATES))
+            .group_by(TI.dag_id, TI.task_id, TI.run_id, TI.state)
         )
-        for dag_id, task_id, run_id, c in query:
-            self.dag_run_active_tasks_map[dag_id, run_id] += c
-            self.task_concurrency_map[(dag_id, task_id)] += c
-            self.task_dagrun_concurrency_map[(dag_id, run_id, task_id)] += c
+        for dag_id, task_id, run_id, state, count in query:
+            # Always count towards task-level concurrency 
(max_active_tis_per_dag /
+            # max_active_tis_per_dagrun), including DEFERRED.
+            self.task_concurrency_map[(dag_id, task_id)] += count
+            self.task_dagrun_concurrency_map[(dag_id, run_id, task_id)] += 
count
+            # Only count non-deferred states towards DAG-run active tasks
+            # (max_active_tasks / worker slot accounting).
+            if state != TaskInstanceState.DEFERRED:
+                self.dag_run_active_tasks_map[dag_id, run_id] += count

Review Comment:
   As @Nataneljpwd  mentioned, `max_active_tasks` is primarily about limiting 
how many executor slots a DAG run can take up. Because a `DEFERRED` task 
releases its worker slot to the Triggerer, excluding it from this limit ensures 
that a DAG doesn't needlessly block other tasks from being scheduled while it 
waits on external events. 
   However, task level limits like `max_active_tis_per_dag` and 
`max_active_tis_per_dagrun` are typically configured by users to protect 
external resources. Even though a task is `DEFERRED` internally, it is still 
logically "active" against that external system. If we exclude `DEFERRED` tasks 
from these limits, Airflow could queue up hundreds of them simultaneously, 
completely bypassing the user's intended safeguard and overwhelming the 
downstream service.
   
   open up for advive and changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to