ashb commented on a change in pull request #19747:
URL: https://github.com/apache/airflow/pull/19747#discussion_r806800672



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -266,192 +266,254 @@ def _executable_task_instances_to_queued(self, max_tis: 
int, session: Session =
 
         if pool_slots_free == 0:
             self.log.debug("All pools are full!")
-            return executable_tis
+            return []
 
         max_tis = min(max_tis, pool_slots_free)
 
-        # Get all task instances associated with scheduled
-        # DagRuns which are not backfilled, in the given states,
-        # and the dag is not paused
-        query = (
-            session.query(TI)
-            .join(TI.dag_run)
-            .filter(DR.run_type != DagRunType.BACKFILL_JOB, DR.state == 
DagRunState.RUNNING)
-            .join(TI.dag_model)
-            .filter(not_(DM.is_paused))
-            .filter(TI.state == TaskInstanceState.SCHEDULED)
-            .options(selectinload('dag_model'))
-            .order_by(-TI.priority_weight, DR.execution_date)
-        )
-        starved_pools = [pool_name for pool_name, stats in pools.items() if 
stats['open'] <= 0]
-        if starved_pools:
-            query = query.filter(not_(TI.pool.in_(starved_pools)))
-
-        query = query.limit(max_tis)
-
-        task_instances_to_examine: List[TI] = with_row_locks(
-            query,
-            of=TI,
-            session=session,
-            **skip_locked(session=session),
-        ).all()
-        # TODO[HA]: This was wrong before anyway, as it only looked at a 
sub-set of dags, not everything.
-        # Stats.gauge('scheduler.tasks.pending', 
len(task_instances_to_examine))
-
-        if len(task_instances_to_examine) == 0:
-            self.log.debug("No tasks to consider for execution.")
-            return executable_tis
-
-        # Put one task instance on each line
-        task_instance_str = "\n\t".join(repr(x) for x in 
task_instances_to_examine)
-        self.log.info("%s tasks up for execution:\n\t%s", 
len(task_instances_to_examine), task_instance_str)
-
-        pool_to_task_instances: DefaultDict[str, List[models.Pool]] = 
defaultdict(list)
-        for task_instance in task_instances_to_examine:
-            pool_to_task_instances[task_instance.pool].append(task_instance)
+        starved_pools = {pool_name for pool_name, stats in pools.items() if 
stats['open'] <= 0}
 
         # dag_id to # of running tasks and (dag_id, task_id) to # of running 
tasks.
-        dag_max_active_tasks_map: DefaultDict[str, int]
+        dag_active_tasks_map: DefaultDict[str, int]
         task_concurrency_map: DefaultDict[Tuple[str, str], int]

Review comment:
       Hmmm, I wonder if this needs to include map_index now so,
   
   ```suggestion
           task_concurrency_map: DefaultDict[Tuple[str, str, int], int]
   ```
   
   Making the tuple (dag_id, task_id, map_index).
   
   See https://github.com/apache/airflow/pull/21210 for a bit more info.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to