ashb commented on a change in pull request #10956:
URL: https://github.com/apache/airflow/pull/10956#discussion_r491876853



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1705,62 +1305,216 @@ def _run_scheduler_loop(self) -> None:
             loop_duration = loop_end_time - loop_start_time
             self.log.debug("Ran scheduling loop in %.2f seconds", 
loop_duration)
 
-            if not is_unit_test:
+            if not is_unit_test and not num_queued_tis and not 
num_finished_events:
+                # If the scheduler is doing things, don't sleep. This means 
when there is work to do, the
+                # scheduler will run "as quick as possible", but when it's 
stopped, it can sleep, dropping CPU
+                # usage when "idle"
                 time.sleep(self._processor_poll_interval)
 
-            if self.processor_agent.done:
+            if self.num_runs > 0 and loop_count >= self.num_runs and 
self.processor_agent.done:
                 self.log.info(
-                    "Exiting scheduler loop as all files have been processed 
%d times", self.num_runs
+                    "Exiting scheduler loop as requested number of runs (%d - 
got to %d) has been reached",
+                    self.num_runs, loop_count,
                 )
                 break
 
-    def _validate_and_run_task_instances(self, simple_dag_bag: SimpleDagBag) 
-> bool:
-        if simple_dag_bag.serialized_dags:
+    def _do_scheduling(self, session) -> int:
+        """
+        This function is where the main scheduling decisions take places. It:
+
+        - Creates any necessary DAG runs by examining the 
next_dagrun_create_after column of DagModel
+
+        - Finds the "next n oldest" running DAG Runs to examine for scheduling 
(n=20 by default) and tries to
+          progress state (TIs to SCHEDULED, or DagRuns to SUCCESS/FAILURE etc)
+
+          By "next oldest", we mean hasn't been examined/scheduled in the most 
time.
+
+        - Then, via a Critical Section (locking the rows of the Pool model) we 
queue tasks, and then send them
+          to the executor.
+
+          See docs of _critical_section_execute_task_instances for more.
+
+        :return: Number of TIs enqueued in this iteration
+        :rtype: int
+        """
+        try:
+            from sqlalchemy import event
+            expected_commit = False
+
+            # Put a check in place to make sure we don't commit unexpectedly
+            @event.listens_for(session.bind, 'commit')
+            def validate_commit(_):
+                nonlocal expected_commit
+                if expected_commit:
+                    expected_commit = False
+                    return
+                raise RuntimeError("UNEXPECTED COMMIT - THIS WILL BREAK HA 
LOCKS!")
+
+            query = DagModel.dags_needing_dagruns(session)

Review comment:
       Yup

##########
File path: airflow/jobs/backfill_job.py
##########
@@ -629,6 +629,7 @@ def _per_task_process(task, key, ti, session=None):  # 
pylint: disable=too-many-
             _dag_runs = ti_status.active_runs[:]
             for run in _dag_runs:
                 run.update_state(session=session)
+                session.merge(run)

Review comment:
       It is, yes, but I can do this better
   
   In 
[dagrun.py](https://github.com/apache/airflow/pull/10956/files#diff-32aa8dbb910719ef24a39cab5d0f2a97L376-L379)
 I removed the commit (on purpose) but also removed the merge by mistake:
   
   ```diff
   -        # todo: determine we want to use with_for_update to make sure to 
lock the run
   -        session.merge(self)
   -        session.commit()
   -
   ```
   
   ->
   
   
   ```diff
   -        # todo: determine we want to use with_for_update to make sure to 
lock the run
           session.merge(self)
   -        session.commit()
   -
   ```

##########
File path: airflow/jobs/backfill_job.py
##########
@@ -629,6 +629,7 @@ def _per_task_process(task, key, ti, session=None):  # 
pylint: disable=too-many-
             _dag_runs = ti_status.active_runs[:]
             for run in _dag_runs:
                 run.update_state(session=session)
+                session.merge(run)

Review comment:
       See https://github.com/apache/airflow/pull/10956#discussion_r492048058 
(can remove this entirely.)

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1052,6 +703,8 @@ def __init__(
         self.max_tis_per_query: int = conf.getint('scheduler', 
'max_tis_per_query')
         self.processor_agent: Optional[DagFileProcessorAgent] = None
 
+        self.dagbag = DagBag(read_dags_from_db=True)

Review comment:
       Do you mean have a DbDagBag and a FileDagBag class, or something of that 
nature?

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1179,39 +832,50 @@ def __get_concurrency_maps(
 
     # pylint: disable=too-many-locals,too-many-statements
     @provide_session
-    def _find_executable_task_instances(
-        self,
-        simple_dag_bag: SimpleDagBag,
-        session: Session = None
-    ) -> List[TI]:
+    def _executable_task_instances_to_queued(self, max_tis: int, session: 
Session = None) -> List[TI]:
         """
         Finds TIs that are ready for execution with respect to pool limits,
         dag concurrency, executor state, and priority.
 
-        :param simple_dag_bag: TaskInstances associated with DAGs in the
-            simple_dag_bag will be fetched from the DB and executed
-        :type simple_dag_bag: airflow.utils.dag_processing.SimpleDagBag
+        :param max_tis: Maximum number of TIs to queue in this loop.
+        :type max_tis: int
         :return: list[airflow.models.TaskInstance]
         """
         executable_tis: List[TI] = []
 
+        # Get the pool settings. We get a lock on the pool rows, treating this 
as a "critical section"
+        # Throws an exception if lock cannot be obtained, rather than blocking
+        pools = models.Pool.slots_stats(with_for_update=nowait(session), 
session=session)
+
+        # If the pools are full, there is no point doing anything!
+        # If _somehow_ the pool is overfull, don't let the limit go negative - 
it breaks SQL
+        pool_slots_free = max(0, sum(map(operator.itemgetter('open'), 
pools.values())))
+
+        if pool_slots_free == 0:
+            self.log.debug("All pools are full!")
+            return executable_tis
+
+        max_tis = min(max_tis, pool_slots_free)
+
         # Get all task instances associated with scheduled
         # DagRuns which are not backfilled, in the given states,
         # and the dag is not paused
         task_instances_to_examine: List[TI] = (
             session
             .query(TI)
-            .filter(TI.dag_id.in_(simple_dag_bag.dag_ids))
-            .outerjoin(
-                DR, and_(DR.dag_id == TI.dag_id, DR.execution_date == 
TI.execution_date)
-            )
-            .filter(or_(DR.run_id.is_(None), DR.run_type != 
DagRunType.BACKFILL_JOB.value))
-            .outerjoin(DM, DM.dag_id == TI.dag_id)
-            .filter(or_(DM.dag_id.is_(None), not_(DM.is_paused)))
+            .outerjoin(TI.dag_run)
+            .filter(or_(DR.run_id.is_(None),
+                        DR.run_type != DagRunType.BACKFILL_JOB.value))
+            .join(TI.dag_model)
+            .filter(not_(DM.is_paused))
             .filter(TI.state == State.SCHEDULED)
+            .options(selectinload('dag_model'))
+            .limit(max_tis)
+            .with_for_update(of=TI, **skip_locked(session=session))

Review comment:
       It didn't seem to cause any problems on 5.7 when running with just a 
single scheduler -- we ran some of our medium sized benchmarks and it behaved 
as before.

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1705,62 +1305,216 @@ def _run_scheduler_loop(self) -> None:
             loop_duration = loop_end_time - loop_start_time
             self.log.debug("Ran scheduling loop in %.2f seconds", 
loop_duration)
 
-            if not is_unit_test:
+            if not is_unit_test and not num_queued_tis and not 
num_finished_events:
+                # If the scheduler is doing things, don't sleep. This means 
when there is work to do, the
+                # scheduler will run "as quick as possible", but when it's 
stopped, it can sleep, dropping CPU
+                # usage when "idle"
                 time.sleep(self._processor_poll_interval)
 
-            if self.processor_agent.done:
+            if self.num_runs > 0 and loop_count >= self.num_runs and 
self.processor_agent.done:
                 self.log.info(
-                    "Exiting scheduler loop as all files have been processed 
%d times", self.num_runs
+                    "Exiting scheduler loop as requested number of runs (%d - 
got to %d) has been reached",
+                    self.num_runs, loop_count,
                 )
                 break
 
-    def _validate_and_run_task_instances(self, simple_dag_bag: SimpleDagBag) 
-> bool:
-        if simple_dag_bag.serialized_dags:
+    def _do_scheduling(self, session) -> int:
+        """
+        This function is where the main scheduling decisions take places. It:
+
+        - Creates any necessary DAG runs by examining the 
next_dagrun_create_after column of DagModel
+
+        - Finds the "next n oldest" running DAG Runs to examine for scheduling 
(n=20 by default) and tries to
+          progress state (TIs to SCHEDULED, or DagRuns to SUCCESS/FAILURE etc)
+
+          By "next oldest", we mean hasn't been examined/scheduled in the most 
time.
+
+        - Then, via a Critical Section (locking the rows of the Pool model) we 
queue tasks, and then send them
+          to the executor.
+
+          See docs of _critical_section_execute_task_instances for more.
+
+        :return: Number of TIs enqueued in this iteration
+        :rtype: int
+        """
+        try:
+            from sqlalchemy import event
+            expected_commit = False
+
+            # Put a check in place to make sure we don't commit unexpectedly
+            @event.listens_for(session.bind, 'commit')
+            def validate_commit(_):
+                nonlocal expected_commit
+                if expected_commit:
+                    expected_commit = False
+                    return
+                raise RuntimeError("UNEXPECTED COMMIT - THIS WILL BREAK HA 
LOCKS!")

Review comment:
       Sure, will create a txn manager.
   
   I think keeping it for unit/integration tests is the right thing to do, I'm 
open to discussion if we should remove it for "runtime".

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1179,39 +832,50 @@ def __get_concurrency_maps(
 
     # pylint: disable=too-many-locals,too-many-statements
     @provide_session
-    def _find_executable_task_instances(
-        self,
-        simple_dag_bag: SimpleDagBag,
-        session: Session = None
-    ) -> List[TI]:
+    def _executable_task_instances_to_queued(self, max_tis: int, session: 
Session = None) -> List[TI]:
         """
         Finds TIs that are ready for execution with respect to pool limits,
         dag concurrency, executor state, and priority.
 
-        :param simple_dag_bag: TaskInstances associated with DAGs in the
-            simple_dag_bag will be fetched from the DB and executed
-        :type simple_dag_bag: airflow.utils.dag_processing.SimpleDagBag
+        :param max_tis: Maximum number of TIs to queue in this loop.
+        :type max_tis: int
         :return: list[airflow.models.TaskInstance]
         """
         executable_tis: List[TI] = []
 
+        # Get the pool settings. We get a lock on the pool rows, treating this 
as a "critical section"
+        # Throws an exception if lock cannot be obtained, rather than blocking
+        pools = models.Pool.slots_stats(with_for_update=nowait(session), 
session=session)
+
+        # If the pools are full, there is no point doing anything!
+        # If _somehow_ the pool is overfull, don't let the limit go negative - 
it breaks SQL
+        pool_slots_free = max(0, sum(map(operator.itemgetter('open'), 
pools.values())))
+
+        if pool_slots_free == 0:
+            self.log.debug("All pools are full!")
+            return executable_tis
+
+        max_tis = min(max_tis, pool_slots_free)
+
         # Get all task instances associated with scheduled
         # DagRuns which are not backfilled, in the given states,
         # and the dag is not paused
         task_instances_to_examine: List[TI] = (
             session
             .query(TI)
-            .filter(TI.dag_id.in_(simple_dag_bag.dag_ids))
-            .outerjoin(
-                DR, and_(DR.dag_id == TI.dag_id, DR.execution_date == 
TI.execution_date)
-            )
-            .filter(or_(DR.run_id.is_(None), DR.run_type != 
DagRunType.BACKFILL_JOB.value))
-            .outerjoin(DM, DM.dag_id == TI.dag_id)
-            .filter(or_(DM.dag_id.is_(None), not_(DM.is_paused)))
+            .outerjoin(TI.dag_run)
+            .filter(or_(DR.run_id.is_(None),

Review comment:
       Exceeding unlikely (Maybe if you deleted the dag_run via the Browse -> 
DagRuns, I think that used to be possible but isn't anymore), but the existing 
code allowed it, and we have unit tests that check for it, so to keep this 
change smaller I didn't remove it.

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1179,39 +832,50 @@ def __get_concurrency_maps(
 
     # pylint: disable=too-many-locals,too-many-statements
     @provide_session
-    def _find_executable_task_instances(
-        self,
-        simple_dag_bag: SimpleDagBag,
-        session: Session = None
-    ) -> List[TI]:
+    def _executable_task_instances_to_queued(self, max_tis: int, session: 
Session = None) -> List[TI]:
         """
         Finds TIs that are ready for execution with respect to pool limits,
         dag concurrency, executor state, and priority.
 
-        :param simple_dag_bag: TaskInstances associated with DAGs in the
-            simple_dag_bag will be fetched from the DB and executed
-        :type simple_dag_bag: airflow.utils.dag_processing.SimpleDagBag
+        :param max_tis: Maximum number of TIs to queue in this loop.
+        :type max_tis: int
         :return: list[airflow.models.TaskInstance]
         """
         executable_tis: List[TI] = []
 
+        # Get the pool settings. We get a lock on the pool rows, treating this 
as a "critical section"
+        # Throws an exception if lock cannot be obtained, rather than blocking
+        pools = models.Pool.slots_stats(with_for_update=nowait(session), 
session=session)
+
+        # If the pools are full, there is no point doing anything!
+        # If _somehow_ the pool is overfull, don't let the limit go negative - 
it breaks SQL
+        pool_slots_free = max(0, sum(map(operator.itemgetter('open'), 
pools.values())))
+
+        if pool_slots_free == 0:
+            self.log.debug("All pools are full!")
+            return executable_tis
+
+        max_tis = min(max_tis, pool_slots_free)
+
         # Get all task instances associated with scheduled
         # DagRuns which are not backfilled, in the given states,
         # and the dag is not paused
         task_instances_to_examine: List[TI] = (
             session
             .query(TI)
-            .filter(TI.dag_id.in_(simple_dag_bag.dag_ids))
-            .outerjoin(
-                DR, and_(DR.dag_id == TI.dag_id, DR.execution_date == 
TI.execution_date)
-            )
-            .filter(or_(DR.run_id.is_(None), DR.run_type != 
DagRunType.BACKFILL_JOB.value))
-            .outerjoin(DM, DM.dag_id == TI.dag_id)
-            .filter(or_(DM.dag_id.is_(None), not_(DM.is_paused)))
+            .outerjoin(TI.dag_run)
+            .filter(or_(DR.run_id.is_(None),
+                        DR.run_type != DagRunType.BACKFILL_JOB.value))
+            .join(TI.dag_model)
+            .filter(not_(DM.is_paused))
             .filter(TI.state == State.SCHEDULED)
+            .options(selectinload('dag_model'))
+            .limit(max_tis)
+            .with_for_update(of=TI, **skip_locked(session=session))

Review comment:
       > * MySQL 5.7 -> go with 1 scheduler (and escape hatch if possible/easy 
to implement). Experiment if you're adventurous, but expect moderate gains and 
higher risks of concurrency related problems
   > 
   > * MySQL 8 -> should work fine and bring expected gains, with much lower 
risk of concurrency problems.
   
   Yes, this matches with our testing so far.

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1705,62 +1305,216 @@ def _run_scheduler_loop(self) -> None:
             loop_duration = loop_end_time - loop_start_time
             self.log.debug("Ran scheduling loop in %.2f seconds", 
loop_duration)
 
-            if not is_unit_test:
+            if not is_unit_test and not num_queued_tis and not 
num_finished_events:
+                # If the scheduler is doing things, don't sleep. This means 
when there is work to do, the
+                # scheduler will run "as quick as possible", but when it's 
stopped, it can sleep, dropping CPU
+                # usage when "idle"
                 time.sleep(self._processor_poll_interval)
 
-            if self.processor_agent.done:
+            if self.num_runs > 0 and loop_count >= self.num_runs and 
self.processor_agent.done:
                 self.log.info(
-                    "Exiting scheduler loop as all files have been processed 
%d times", self.num_runs
+                    "Exiting scheduler loop as requested number of runs (%d - 
got to %d) has been reached",
+                    self.num_runs, loop_count,
                 )
                 break
 
-    def _validate_and_run_task_instances(self, simple_dag_bag: SimpleDagBag) 
-> bool:
-        if simple_dag_bag.serialized_dags:
+    def _do_scheduling(self, session) -> int:
+        """
+        This function is where the main scheduling decisions take places. It:
+
+        - Creates any necessary DAG runs by examining the 
next_dagrun_create_after column of DagModel
+
+        - Finds the "next n oldest" running DAG Runs to examine for scheduling 
(n=20 by default) and tries to
+          progress state (TIs to SCHEDULED, or DagRuns to SUCCESS/FAILURE etc)
+
+          By "next oldest", we mean hasn't been examined/scheduled in the most 
time.
+
+        - Then, via a Critical Section (locking the rows of the Pool model) we 
queue tasks, and then send them
+          to the executor.
+
+          See docs of _critical_section_execute_task_instances for more.
+
+        :return: Number of TIs enqueued in this iteration
+        :rtype: int
+        """
+        try:
+            from sqlalchemy import event
+            expected_commit = False
+
+            # Put a check in place to make sure we don't commit unexpectedly
+            @event.listens_for(session.bind, 'commit')
+            def validate_commit(_):
+                nonlocal expected_commit
+                if expected_commit:
+                    expected_commit = False
+                    return
+                raise RuntimeError("UNEXPECTED COMMIT - THIS WILL BREAK HA 
LOCKS!")

Review comment:
       I don't think this is (easily) detectable with static checks -- for 
instance if you call a method with the `@provide_session` decorator and forget 
to pass it a `session=session` param, then you will end up committing by 
mistake.
   
   (I was a little bit surprised by this, as I thought it should be a new or 
perhaps it's using a different DB connection but the events are bound to all 
sessions, not just one instance)
   
   It feels like a lot of work, and not 100% reliable, to write a custom pylint 
check when an `if not self.unit_test_mode` would have the same effect.
   

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1705,62 +1305,216 @@ def _run_scheduler_loop(self) -> None:
             loop_duration = loop_end_time - loop_start_time
             self.log.debug("Ran scheduling loop in %.2f seconds", 
loop_duration)
 
-            if not is_unit_test:
+            if not is_unit_test and not num_queued_tis and not 
num_finished_events:
+                # If the scheduler is doing things, don't sleep. This means 
when there is work to do, the
+                # scheduler will run "as quick as possible", but when it's 
stopped, it can sleep, dropping CPU
+                # usage when "idle"
                 time.sleep(self._processor_poll_interval)
 
-            if self.processor_agent.done:
+            if self.num_runs > 0 and loop_count >= self.num_runs and 
self.processor_agent.done:
                 self.log.info(
-                    "Exiting scheduler loop as all files have been processed 
%d times", self.num_runs
+                    "Exiting scheduler loop as requested number of runs (%d - 
got to %d) has been reached",
+                    self.num_runs, loop_count,
                 )
                 break
 
-    def _validate_and_run_task_instances(self, simple_dag_bag: SimpleDagBag) 
-> bool:
-        if simple_dag_bag.serialized_dags:
+    def _do_scheduling(self, session) -> int:
+        """
+        This function is where the main scheduling decisions take places. It:
+
+        - Creates any necessary DAG runs by examining the 
next_dagrun_create_after column of DagModel
+
+        - Finds the "next n oldest" running DAG Runs to examine for scheduling 
(n=20 by default) and tries to
+          progress state (TIs to SCHEDULED, or DagRuns to SUCCESS/FAILURE etc)
+
+          By "next oldest", we mean hasn't been examined/scheduled in the most 
time.
+
+        - Then, via a Critical Section (locking the rows of the Pool model) we 
queue tasks, and then send them
+          to the executor.
+
+          See docs of _critical_section_execute_task_instances for more.
+
+        :return: Number of TIs enqueued in this iteration
+        :rtype: int
+        """
+        try:
+            from sqlalchemy import event
+            expected_commit = False
+
+            # Put a check in place to make sure we don't commit unexpectedly
+            @event.listens_for(session.bind, 'commit')
+            def validate_commit(_):
+                nonlocal expected_commit
+                if expected_commit:
+                    expected_commit = False
+                    return
+                raise RuntimeError("UNEXPECTED COMMIT - THIS WILL BREAK HA 
LOCKS!")

Review comment:
       Thinking more about it, a simpler rule would be easier:
   
   If a function has `@provide_session`, it must not do `session.commit()` -- 
only `session.flush()` is allowed.
   
   My thinking here is:
   
   - If passed an existing session, the caller is responsible for committing 
when they choose
   - If not passed a session, then `@provide_session` will commit for us anyway.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to