potiuk commented on a change in pull request #10956:
URL: https://github.com/apache/airflow/pull/10956#discussion_r492184032
##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1705,62 +1305,216 @@ def _run_scheduler_loop(self) -> None:
loop_duration = loop_end_time - loop_start_time
self.log.debug("Ran scheduling loop in %.2f seconds",
loop_duration)
- if not is_unit_test:
+ if not is_unit_test and not num_queued_tis and not
num_finished_events:
+ # If the scheduler is doing things, don't sleep. This means
when there is work to do, the
+ # scheduler will run "as quick as possible", but when it's
stopped, it can sleep, dropping CPU
+ # usage when "idle"
time.sleep(self._processor_poll_interval)
- if self.processor_agent.done:
+ if self.num_runs > 0 and loop_count >= self.num_runs and
self.processor_agent.done:
self.log.info(
- "Exiting scheduler loop as all files have been processed
%d times", self.num_runs
+ "Exiting scheduler loop as requested number of runs (%d -
got to %d) has been reached",
+ self.num_runs, loop_count,
)
break
- def _validate_and_run_task_instances(self, simple_dag_bag: SimpleDagBag)
-> bool:
- if simple_dag_bag.serialized_dags:
+ def _do_scheduling(self, session) -> int:
+ """
+ This function is where the main scheduling decisions take places. It:
+
+ - Creates any necessary DAG runs by examining the
next_dagrun_create_after column of DagModel
+
+ - Finds the "next n oldest" running DAG Runs to examine for scheduling
(n=20 by default) and tries to
+ progress state (TIs to SCHEDULED, or DagRuns to SUCCESS/FAILURE etc)
+
+ By "next oldest", we mean hasn't been examined/scheduled in the most
time.
+
+ - Then, via a Critical Section (locking the rows of the Pool model) we
queue tasks, and then send them
+ to the executor.
+
+ See docs of _critical_section_execute_task_instances for more.
+
+ :return: Number of TIs enqueued in this iteration
+ :rtype: int
+ """
+ try:
+ from sqlalchemy import event
+ expected_commit = False
+
+ # Put a check in place to make sure we don't commit unexpectedly
+ @event.listens_for(session.bind, 'commit')
+ def validate_commit(_):
+ nonlocal expected_commit
+ if expected_commit:
+ expected_commit = False
+ return
+ raise RuntimeError("UNEXPECTED COMMIT - THIS WILL BREAK HA
LOCKS!")
Review comment:
Right. The context managers look much better here. I understand what you
are trying to do. Indeed wee'll have to think about some way of protecting it
at static check time rather than at runtime. Since we aren't doing any
magical/dynamic stuff here, maybe we could do some custom pylint check and add
a @no-commit decorator to functions and then we do this check:
- @nocommit methods can only call @nocommit methods
- @nocommit methods cannot have sessions created - they must be provided
- @nocommit method cannot call commit()
- @nocommit method cannot create their own sessions
Is there any other way to issue a commit() - other than auto-closing an
automated session and explicit commit(). If not - that might be much better to
check it at static check rather than compile time.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]