tanelk commented on a change in pull request #19747:
URL: https://github.com/apache/airflow/pull/19747#discussion_r806838914



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -266,192 +266,254 @@ def _executable_task_instances_to_queued(self, max_tis: 
int, session: Session =
 
         if pool_slots_free == 0:
             self.log.debug("All pools are full!")
-            return executable_tis
+            return []
 
         max_tis = min(max_tis, pool_slots_free)
 
-        # Get all task instances associated with scheduled
-        # DagRuns which are not backfilled, in the given states,
-        # and the dag is not paused
-        query = (
-            session.query(TI)
-            .join(TI.dag_run)
-            .filter(DR.run_type != DagRunType.BACKFILL_JOB, DR.state == 
DagRunState.RUNNING)
-            .join(TI.dag_model)
-            .filter(not_(DM.is_paused))
-            .filter(TI.state == TaskInstanceState.SCHEDULED)
-            .options(selectinload('dag_model'))
-            .order_by(-TI.priority_weight, DR.execution_date)
-        )
-        starved_pools = [pool_name for pool_name, stats in pools.items() if 
stats['open'] <= 0]
-        if starved_pools:
-            query = query.filter(not_(TI.pool.in_(starved_pools)))
-
-        query = query.limit(max_tis)
-
-        task_instances_to_examine: List[TI] = with_row_locks(
-            query,
-            of=TI,
-            session=session,
-            **skip_locked(session=session),
-        ).all()
-        # TODO[HA]: This was wrong before anyway, as it only looked at a 
sub-set of dags, not everything.
-        # Stats.gauge('scheduler.tasks.pending', 
len(task_instances_to_examine))
-
-        if len(task_instances_to_examine) == 0:
-            self.log.debug("No tasks to consider for execution.")
-            return executable_tis
-
-        # Put one task instance on each line
-        task_instance_str = "\n\t".join(repr(x) for x in 
task_instances_to_examine)
-        self.log.info("%s tasks up for execution:\n\t%s", 
len(task_instances_to_examine), task_instance_str)
-
-        pool_to_task_instances: DefaultDict[str, List[models.Pool]] = 
defaultdict(list)
-        for task_instance in task_instances_to_examine:
-            pool_to_task_instances[task_instance.pool].append(task_instance)
+        starved_pools = {pool_name for pool_name, stats in pools.items() if 
stats['open'] <= 0}
 
         # dag_id to # of running tasks and (dag_id, task_id) to # of running 
tasks.
-        dag_max_active_tasks_map: DefaultDict[str, int]
+        dag_active_tasks_map: DefaultDict[str, int]
         task_concurrency_map: DefaultDict[Tuple[str, str], int]

Review comment:
       I'm not very familiar with thw new map API, so I might be off.
   
   If you map a task over 10 values, and that task has concurrency limit of 5, 
then am I right, that we should run max 5 instances at a time. Based on that we 
should count different map_index values just as another task instances - as it 
is currently.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to