turbaszek commented on a change in pull request #10956:
URL: https://github.com/apache/airflow/pull/10956#discussion_r492048058



##########
File path: airflow/jobs/backfill_job.py
##########
@@ -629,6 +629,7 @@ def _per_task_process(task, key, ti, session=None):  # 
pylint: disable=too-many-
             _dag_runs = ti_status.active_runs[:]
             for run in _dag_runs:
                 run.update_state(session=session)
+                session.merge(run)

Review comment:
       Is this change related to scheduler HA?

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -868,122 +608,41 @@ def process_file(
         except Exception:  # pylint: disable=broad-except
             self.log.exception("Failed at reloading the DAG file %s", 
file_path)
             Stats.incr('dag_file_refresh_error', 1, 1)
-            return [], 0
+            return 0, 0
 
         if len(dagbag.dags) > 0:
             self.log.info("DAG(s) %s retrieved from %s", dagbag.dags.keys(), 
file_path)
         else:
             self.log.warning("No viable dags retrieved from %s", file_path)
             self.update_import_errors(session, dagbag)
-            return [], len(dagbag.import_errors)
+            return 0, len(dagbag.import_errors)
 
         try:
             self.execute_on_failure_callbacks(dagbag, 
failure_callback_requests)
         except Exception:  # pylint: disable=broad-except
             self.log.exception("Error executing failure callback!")
 
-        # Save individual DAGs in the ORM and update 
DagModel.last_scheduled_time
+        # Save individual DAGs in the ORM
+        dagbag.read_dags_from_db = True

Review comment:
       Why we don't initialize the `DagBag` in L607 with this parameter?

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1179,39 +832,50 @@ def __get_concurrency_maps(
 
     # pylint: disable=too-many-locals,too-many-statements
     @provide_session
-    def _find_executable_task_instances(
-        self,
-        simple_dag_bag: SimpleDagBag,
-        session: Session = None
-    ) -> List[TI]:
+    def _executable_task_instances_to_queued(self, max_tis: int, session: 
Session = None) -> List[TI]:
         """
         Finds TIs that are ready for execution with respect to pool limits,
         dag concurrency, executor state, and priority.
 
-        :param simple_dag_bag: TaskInstances associated with DAGs in the
-            simple_dag_bag will be fetched from the DB and executed
-        :type simple_dag_bag: airflow.utils.dag_processing.SimpleDagBag
+        :param max_tis: Maximum number of TIs to queue in this loop.
+        :type max_tis: int
         :return: list[airflow.models.TaskInstance]
         """
         executable_tis: List[TI] = []
 
+        # Get the pool settings. We get a lock on the pool rows, treating this 
as a "critical section"
+        # Throws an exception if lock cannot be obtained, rather than blocking
+        pools = models.Pool.slots_stats(with_for_update=nowait(session), 
session=session)
+
+        # If the pools are full, there is no point doing anything!
+        # If _somehow_ the pool is overfull, don't let the limit go negative - 
it breaks SQL
+        pool_slots_free = max(0, sum(map(operator.itemgetter('open'), 
pools.values())))

Review comment:
       ```suggestion
           pool_slots_free = max(0, sum(v["open"] for v in pools.values()))
   ```
   How about simplifying it?

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1179,39 +832,50 @@ def __get_concurrency_maps(
 
     # pylint: disable=too-many-locals,too-many-statements
     @provide_session
-    def _find_executable_task_instances(
-        self,
-        simple_dag_bag: SimpleDagBag,
-        session: Session = None
-    ) -> List[TI]:
+    def _executable_task_instances_to_queued(self, max_tis: int, session: 
Session = None) -> List[TI]:
         """
         Finds TIs that are ready for execution with respect to pool limits,
         dag concurrency, executor state, and priority.
 
-        :param simple_dag_bag: TaskInstances associated with DAGs in the
-            simple_dag_bag will be fetched from the DB and executed
-        :type simple_dag_bag: airflow.utils.dag_processing.SimpleDagBag
+        :param max_tis: Maximum number of TIs to queue in this loop.
+        :type max_tis: int
         :return: list[airflow.models.TaskInstance]
         """
         executable_tis: List[TI] = []
 
+        # Get the pool settings. We get a lock on the pool rows, treating this 
as a "critical section"
+        # Throws an exception if lock cannot be obtained, rather than blocking
+        pools = models.Pool.slots_stats(with_for_update=nowait(session), 
session=session)
+
+        # If the pools are full, there is no point doing anything!
+        # If _somehow_ the pool is overfull, don't let the limit go negative - 
it breaks SQL
+        pool_slots_free = max(0, sum(map(operator.itemgetter('open'), 
pools.values())))
+
+        if pool_slots_free == 0:
+            self.log.debug("All pools are full!")
+            return executable_tis
+
+        max_tis = min(max_tis, pool_slots_free)
+
         # Get all task instances associated with scheduled
         # DagRuns which are not backfilled, in the given states,
         # and the dag is not paused
         task_instances_to_examine: List[TI] = (
             session
             .query(TI)
-            .filter(TI.dag_id.in_(simple_dag_bag.dag_ids))
-            .outerjoin(
-                DR, and_(DR.dag_id == TI.dag_id, DR.execution_date == 
TI.execution_date)
-            )
-            .filter(or_(DR.run_id.is_(None), DR.run_type != 
DagRunType.BACKFILL_JOB.value))
-            .outerjoin(DM, DM.dag_id == TI.dag_id)
-            .filter(or_(DM.dag_id.is_(None), not_(DM.is_paused)))
+            .outerjoin(TI.dag_run)
+            .filter(or_(DR.run_id.is_(None),

Review comment:
       Is it possible that `DagRun.run_id` will be None?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to