ashb commented on a change in pull request #10956:
URL: https://github.com/apache/airflow/pull/10956#discussion_r491323370
##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1705,62 +1305,216 @@ def _run_scheduler_loop(self) -> None:
loop_duration = loop_end_time - loop_start_time
self.log.debug("Ran scheduling loop in %.2f seconds",
loop_duration)
- if not is_unit_test:
+ if not is_unit_test and not num_queued_tis and not
num_finished_events:
+ # If the scheduler is doing things, don't sleep. This means
when there is work to do, the
+ # scheduler will run "as quick as possible", but when it's
stopped, it can sleep, dropping CPU
+ # usage when "idle"
time.sleep(self._processor_poll_interval)
- if self.processor_agent.done:
+ if self.num_runs > 0 and loop_count >= self.num_runs and
self.processor_agent.done:
self.log.info(
- "Exiting scheduler loop as all files have been processed
%d times", self.num_runs
+ "Exiting scheduler loop as requested number of runs (%d -
got to %d) has been reached",
+ self.num_runs, loop_count,
)
break
- def _validate_and_run_task_instances(self, simple_dag_bag: SimpleDagBag)
-> bool:
- if simple_dag_bag.serialized_dags:
+ def _do_scheduling(self, session) -> int:
+ """
+ This function is where the main scheduling decisions take places. It:
+
+ - Creates any necessary DAG runs by examining the
next_dagrun_create_after column of DagModel
+
+ - Finds the "next n oldest" running DAG Runs to examine for scheduling
(n=20 by default) and tries to
+ progress state (TIs to SCHEDULED, or DagRuns to SUCCESS/FAILURE etc)
+
+ By "next oldest", we mean hasn't been examined/scheduled in the most
time.
+
+ - Then, via a Critical Section (locking the rows of the Pool model) we
queue tasks, and then send them
+ to the executor.
+
+ See docs of _critical_section_execute_task_instances for more.
+
+ :return: Number of TIs enqueued in this iteration
+ :rtype: int
+ """
+ try:
+ from sqlalchemy import event
+ expected_commit = False
+
+ # Put a check in place to make sure we don't commit unexpectedly
+ @event.listens_for(session.bind, 'commit')
+ def validate_commit(_):
+ nonlocal expected_commit
+ if expected_commit:
+ expected_commit = False
+ return
+ raise RuntimeError("UNEXPECTED COMMIT - THIS WILL BREAK HA
LOCKS!")
+
+ query = DagModel.dags_needing_dagruns(session)
+ for dag_model in query:
+ dag = self.dagbag.get_dag(dag_model.dag_id, session=session)
+ self._create_dag_run(dag_model, dag, session)
+
+ # commit the session - Release the write lock on DagModel table.
+ expected_commit = True
+ session.commit()
+ # END: create dagruns
+
+ for dag_run in DagRun.next_dagruns_to_examine(session):
+ self._schedule_dag_run(dag_run, session)
+
+ expected_commit = True
+ session.commit()
+
+ # Without this, the session has an invalid view of the DB
+ session.expunge_all()
+ # END: schedule TIs
+
+ # TODO[HA]: Do we need to call
+ # _change_state_for_tis_without_dagrun (2x) that we were before
+ # to tidy up manually tweaked TIs. Do we need to do it every
+ # time?
+
try:
- self._process_and_execute_tasks(simple_dag_bag)
- except Exception as e: # pylint: disable=broad-except
- self.log.error("Error queuing tasks")
- self.log.exception(e)
- return False
-
- # Call heartbeats
- self.log.debug("Heartbeating the executor")
- self.executor.heartbeat()
-
- self._change_state_for_tasks_failed_to_execute()
-
- # Process events from the executor
- self._process_executor_events(simple_dag_bag)
- return True
-
- def _process_and_execute_tasks(self, simple_dag_bag: SimpleDagBag) -> None:
- # Handle cases where a DAG run state is set (perhaps manually) to
- # a non-running state. Handle task instances that belong to
- # DAG runs in those states
- # If a task instance is up for retry but the corresponding DAG run
- # isn't running, mark the task instance as FAILED so we don't try
- # to re-run it.
- self._change_state_for_tis_without_dagrun(
- simple_dag_bag=simple_dag_bag,
- old_states=[State.UP_FOR_RETRY],
- new_state=State.FAILED
- )
- # If a task instance is scheduled or queued or up for reschedule,
- # but the corresponding DAG run isn't running, set the state to
- # NONE so we don't try to re-run it.
- self._change_state_for_tis_without_dagrun(
- simple_dag_bag=simple_dag_bag,
- old_states=[State.QUEUED,
- State.SCHEDULED,
- State.UP_FOR_RESCHEDULE,
- State.SENSING],
- new_state=State.NONE
+ if self.executor.slots_available <= 0:
+ # We know we can't do anything here, so don't even try!
+ self.log.debug("Executor full, skipping critical section")
+ return 0
+
+ timer = Stats.timer('scheduler.critical_section_duration')
+ timer.start()
+
+ # Find anything TIs in state SCHEDULED, try to QUEUE it (send
it to the executor)
+ num_queued_tis =
self._critical_section_execute_task_instances(session=session)
+
+ # Make sure we only sent this metric if we obtained the lock,
otherwise we'll skew the
+ # metric, way down
+ timer.stop(send=True)
+ except OperationalError as e:
+ timer.stop(send=False)
+
+ # DB specific error codes:
+ # Postgres: 55P03
+ # MySQL: 3572, 'Statement aborted because lock(s) could not be
acquired immediately and NOWAIT
+ # is set.'
+ # MySQL: 1205, 'Lock wait timeout exceeded; try restarting
transaction
+ # (when NOWAIT isn't available)
+ db_err_code = getattr(e.orig, 'pgcode', None) or e.orig.args[0]
+
+ # We could test if e.orig is an instance of
+ #
psycopg2.errors.LockNotAvailable/_mysql_exceptions.OperationalError, but that
involves
+ # importing it. This doesn't
+ if db_err_code in ('55P03', 1205, 3572):
+ self.log.debug("Critical section lock held by another
Scheduler")
+ Stats.incr('scheduler.critical_section_busy')
+ return 0
+ raise
+
+ return num_queued_tis
+ finally:
+ event.remove(session.bind, 'commit', validate_commit)
+
+ def _create_dag_run(self, dag_model: DagModel, dag: DAG, session: Session)
-> None:
+ """
+ Unconditionally create a DAG run for the given DAG, and update the
dag_model's fields to control
Review comment:
Unconditional here as the checks are done elsewhere/at the caller.
I'll double check the case you mention - I think it's already covered in the
unit tests too
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]