mik-laj commented on a change in pull request #10956:
URL: https://github.com/apache/airflow/pull/10956#discussion_r499552738



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1706,62 +1342,326 @@ def _run_scheduler_loop(self) -> None:
             loop_duration = loop_end_time - loop_start_time
             self.log.debug("Ran scheduling loop in %.2f seconds", 
loop_duration)
 
-            if not is_unit_test:
+            if not is_unit_test and not num_queued_tis and not 
num_finished_events:
+                # If the scheduler is doing things, don't sleep. This means 
when there is work to do, the
+                # scheduler will run "as quick as possible", but when it's 
stopped, it can sleep, dropping CPU
+                # usage when "idle"
                 time.sleep(self._processor_poll_interval)
 
+            if self.num_runs > 0 and loop_count >= self.num_runs:
+                self.log.info(
+                    "Exiting scheduler loop as requested number of runs (%d - 
got to %d) has been reached",
+                    self.num_runs, loop_count,
+                )
+                break
             if self.processor_agent.done:
                 self.log.info(
-                    "Exiting scheduler loop as all files have been processed 
%d times", self.num_runs
+                    "Exiting scheduler loop as requested DAG parse count (%d) 
has been reached after %d "
+                    " scheduler loops",
+                    self.num_times_parse_dags, loop_count,
                 )
                 break
 
-    def _validate_and_run_task_instances(self, simple_dag_bag: SimpleDagBag) 
-> bool:
-        if simple_dag_bag.serialized_dags:
+    def _do_scheduling(self, session) -> int:
+        """
+        This function is where the main scheduling decisions take places. It:
+
+        - Creates any necessary DAG runs by examining the 
next_dagrun_create_after column of DagModel
+
+        - Finds the "next n oldest" running DAG Runs to examine for scheduling 
(n=20 by default) and tries to
+          progress state (TIs to SCHEDULED, or DagRuns to SUCCESS/FAILURE etc)
+
+          By "next oldest", we mean hasn't been examined/scheduled in the most 
time.
+
+        - Then, via a Critical Section (locking the rows of the Pool model) we 
queue tasks, and then send them
+          to the executor.
+
+          See docs of _critical_section_execute_task_instances for more.
+
+        :return: Number of TIs enqueued in this iteration
+        :rtype: int
+        """
+        # Put a check in place to make sure we don't commit unexpectedly
+        with prohibit_commit(session) as guard:
+
+            query = DagModel.dags_needing_dagruns(session)
+            self._create_dag_runs(query.all(), session)
+
+            # commit the session - Release the write lock on DagModel table.
+            guard.commit()
+            # END: create dagruns
+
+            dag_runs = DagRun.next_dagruns_to_examine(session)
+
+            # Bulk fetch the currently active dag runs for the dags we are
+            # examining, rather than making one query per DagRun
+
+            # TODO: This query is probably horribly inefficient (though there 
is an
+            # index on (dag_id,state)). It is to deal with the case when a user
+            # clears more than max_active_runs older tasks -- we don't want the
+            # scheduler to suddenly go and start running tasks from all of the
+            # runs. (AIRFLOW-137/GH #1442)
+            #
+            # The longer term fix would be to have `clear` do this, and put 
DagRuns
+            # in to the queued state, then take DRs out of queued before 
creating
+            # any new ones
+            # TODO[HA]: Why is this on TI, not on DagRun??
+            currently_active_runs = dict(session.query(
+                TI.dag_id,
+                func.count(TI.execution_date.distinct()),
+            ).filter(
+                TI.dag_id.in_(list({dag_run.dag_id for dag_run in dag_runs})),
+                TI.state.notin_(State.finished())
+            ).group_by(TI.dag_id).all())
+
+            for dag_run in dag_runs:
+                self._schedule_dag_run(dag_run, 
currently_active_runs.get(dag_run.dag_id, 0), session)
+
+            guard.commit()
+
+            # Without this, the session has an invalid view of the DB
+            session.expunge_all()
+            # END: schedule TIs
+
+            # TODO[HA]: Do we need to call
+            # _change_state_for_tis_without_dagrun (2x) that we were before
+            # to tidy up manually tweaked TIs. Do we need to do it every
+            # time?
+
             try:
-                self._process_and_execute_tasks(simple_dag_bag)
-            except Exception as e:  # pylint: disable=broad-except
-                self.log.error("Error queuing tasks")
-                self.log.exception(e)
-                return False
-
-        # Call heartbeats
-        self.log.debug("Heartbeating the executor")
-        self.executor.heartbeat()
-
-        self._change_state_for_tasks_failed_to_execute()
-
-        # Process events from the executor
-        self._process_executor_events(simple_dag_bag)
-        return True
-
-    def _process_and_execute_tasks(self, simple_dag_bag: SimpleDagBag) -> None:
-        # Handle cases where a DAG run state is set (perhaps manually) to
-        # a non-running state. Handle task instances that belong to
-        # DAG runs in those states
-        # If a task instance is up for retry but the corresponding DAG run
-        # isn't running, mark the task instance as FAILED so we don't try
-        # to re-run it.
-        self._change_state_for_tis_without_dagrun(
-            simple_dag_bag=simple_dag_bag,
-            old_states=[State.UP_FOR_RETRY],
-            new_state=State.FAILED
-        )
-        # If a task instance is scheduled or queued or up for reschedule,
-        # but the corresponding DAG run isn't running, set the state to
-        # NONE so we don't try to re-run it.
-        self._change_state_for_tis_without_dagrun(
-            simple_dag_bag=simple_dag_bag,
-            old_states=[State.QUEUED,
-                        State.SCHEDULED,
-                        State.UP_FOR_RESCHEDULE,
-                        State.SENSING],
-            new_state=State.NONE
+                if self.executor.slots_available <= 0:
+                    # We know we can't do anything here, so don't even try!
+                    self.log.debug("Executor full, skipping critical section")
+                    return 0
+
+                timer = Stats.timer('scheduler.critical_section_duration')
+                timer.start()
+
+                # Find anything TIs in state SCHEDULED, try to QUEUE it (send 
it to the executor)
+                num_queued_tis = 
self._critical_section_execute_task_instances(session=session)
+
+                # Make sure we only sent this metric if we obtained the lock, 
otherwise we'll skew the
+                # metric, way down
+                timer.stop(send=True)
+            except OperationalError as e:
+                timer.stop(send=False)
+
+                # DB specific error codes:
+                # Postgres: 55P03
+                # MySQL: 3572, 'Statement aborted because lock(s) could not be 
acquired immediately and NOWAIT
+                #               is set.'
+                # MySQL: 1205, 'Lock wait timeout exceeded; try restarting 
transaction
+                #              (when NOWAIT isn't available)
+                db_err_code = getattr(e.orig, 'pgcode', None) or e.orig.args[0]
+
+                # We could test if e.orig is an instance of
+                # 
psycopg2.errors.LockNotAvailable/_mysql_exceptions.OperationalError, but that 
involves
+                # importing it. This doesn't
+                if db_err_code in ('55P03', 1205, 3572):
+                    self.log.debug("Critical section lock held by another 
Scheduler")
+                    Stats.incr('scheduler.critical_section_busy')
+                    session.rollback()
+                    return 0
+                raise
+
+            return num_queued_tis
+
+    def _create_dag_runs(self, dag_models: Iterable[DagModel], session: 
Session) -> None:
+        """
+        Unconditionally create a DAG run for the given DAG, and update the 
dag_model's fields to control
+        if/when the next DAGRun should be created
+        """
+        for dag_model in dag_models:
+            dag = self.dagbag.get_dag(dag_model.dag_id, session=session)
+            dag_hash = self.dagbag.dags_hash.get(dag.dag_id, None)
+            dag.create_dagrun(
+                run_type=DagRunType.SCHEDULED,
+                execution_date=dag_model.next_dagrun,
+                start_date=timezone.utcnow(),
+                state=State.RUNNING,
+                external_trigger=False,
+                session=session,
+                dag_hash=dag_hash
+            )
+
+        self._update_dag_next_dagruns(dag_models, session)
+
+        # TODO[HA]: Should we do a session.flush() so we don't have to keep 
lots of state/object in
+        # memory for larger dags? or expunge_all()
+
+    def _update_dag_next_dagruns(self, dag_models: Iterable[DagModel], 
session: Session) -> None:
+        """
+        Bulk update the next_dagrun and next_dagrun_create_after for all the 
dags.
+
+        We batch the select queries to get info about all the dags at once
+        """
+        # Check max_active_runs, to see if we are _now_ at the limit for any of
+        # these dag? (we've just created a DagRun for them after all)
+        active_runs_of_dags = dict(session.query(DagRun.dag_id, 
func.count('*')).filter(
+            DagRun.dag_id.in_([o.dag_id for o in dag_models]),
+            DagRun.state == State.RUNNING,  # pylint: 
disable=comparison-with-callable
+            DagRun.external_trigger.is_(False),
+        ).group_by(DagRun.dag_id).all())
+
+        for dag_model in dag_models:
+            dag = self.dagbag.get_dag(dag_model.dag_id, session=session)
+            active_runs_of_dag = active_runs_of_dags.get(dag.dag_id, 0)
+            if dag.max_active_runs and active_runs_of_dag >= 
dag.max_active_runs:
+                self.log.info(
+                    "DAG %s is at (or above) max_active_runs (%d of %d), not 
creating any more runs",
+                    dag.dag_id, active_runs_of_dag, dag.max_active_runs
+                )
+                dag_model.next_dagrun_create_after = None
+            else:
+                dag_model.next_dagrun, dag_model.next_dagrun_create_after = \
+                    dag.next_dagrun_info(dag_model.next_dagrun)
+
+    def _schedule_dag_run(self, dag_run: DagRun, currently_active_runs: int, 
session: Session) -> int:
+        """
+        Make scheduling decisions about an individual dag run
+
+        ``currently_active_runs`` is passed in so that a batch query can be
+        used to ask this for all dag runs in the batch, to avoid an n+1 query.
+
+        :param dag_run: The DagRun to schedule
+        :param currently_active_runs: Number of currently active runs of this 
DAG
+        :return: Number of tasks scheduled
+        """
+        dag = dag_run.dag = self.dagbag.get_dag(dag_run.dag_id, 
session=session)
+
+        if not dag:
+            self.log.error(
+                "Couldn't find dag %s in DagBag/DB!", dag_run.dag_id
+            )
+            return 0
+
+        if (
+            dag_run.start_date and dag.dagrun_timeout and
+            dag_run.start_date < timezone.utcnow() - dag.dagrun_timeout
+        ):
+            dag_run.state = State.FAILED
+            dag_run.end_date = timezone.utcnow()
+            self.log.info("Run %s of %s has timed-out", dag_run.run_id, 
dag_run.dag_id)
+            session.flush()
+
+            # Work out if we should allow creating a new DagRun now?
+            
self._update_dag_next_dagruns([session.query(DagModel).get(dag_run.dag_id)], 
session)
+
+            dag_run.callback = DagCallbackRequest(
+                full_filepath=dag.fileloc,
+                dag_id=dag.dag_id,
+                execution_date=dag_run.execution_date,
+                is_failure_callback=True,
+                msg='timed_out'
+            )
+
+            # Send SLA & DAG Success/Failure Callbacks to be executed
+            self._send_dag_callbacks_to_processor(dag_run)
+
+            return 0
+
+        if dag_run.execution_date > timezone.utcnow() and not 
dag.allow_future_exec_dates:
+            self.log.error(
+                "Execution date is in future: %s",
+                dag_run.execution_date
+            )
+            return 0
+
+        if dag.max_active_runs:
+            if currently_active_runs >= dag.max_active_runs:
+                self.log.info(
+                    "DAG %s already has %d active runs, not queuing any more 
tasks",
+                    dag.dag_id,
+                    currently_active_runs,
+                )
+                return 0
+
+        self._verify_integrity_if_dag_changed(dag_run=dag_run, session=session)
+        # TODO[HA]: Rename update_state -> schedule_dag_run, ?? something else?
+        schedulable_tis = dag_run.update_state(session=session, 
execute_callbacks=False)
+        # TODO[HA]: Don't return, update these from in update_state?
+
+        # Get list of TIs that do not need to executed, these are
+        # tasks using DummyOperator and without on_execute_callback / 
on_success_callback
+        dummy_tis = [
+            ti for ti in schedulable_tis
+            if
+            (
+                ti.task.task_type == "DummyOperator"
+                and not ti.task.on_execute_callback
+                and not ti.task.on_success_callback
+            )
+        ]
+
+        # This will do one query per dag run. We "could" build up a complex
+        # query to update all the TIs across all the execution dates and dag
+        # IDs in a single query, but it turns out that can be _very very slow_
+        # see #11147/commit ee90807ac for more details
+        count = session.query(TI).filter(
+            TI.dag_id == dag_run.dag_id,
+            TI.execution_date == dag_run.execution_date,
+            TI.task_id.in_(ti.task_id for ti in schedulable_tis if ti not in 
dummy_tis)
+        ).update({TI.state: State.SCHEDULED}, synchronize_session=False)
+
+        # Tasks using DummyOperator should not be executed, mark them as 
success
+        if dummy_tis:
+            session.query(TI).filter(
+                TI.dag_id == dag_run.dag_id,
+                TI.execution_date == dag_run.execution_date,
+                TI.task_id.in_(ti.task_id for ti in dummy_tis)
+            ).update({
+                TI.state: State.SUCCESS,
+                TI.start_date: timezone.utcnow(),
+                TI.end_date: timezone.utcnow(),
+                TI.duration: 0
+            }, synchronize_session=False)
+
+        return count
+
+    @provide_session
+    def _verify_integrity_if_dag_changed(self, dag_run: DagRun, session=None):
+        """Only run DagRun.verify integrity if Serialized DAG has changed 
since it is slow"""
+        latest_version = 
SerializedDagModel.get_latest_version_hash(dag_run.dag_id, session=session)
+        if dag_run.dag_hash == latest_version:
+            self.log.debug("DAG %s not changed structure, skipping 
dagrun.verify_integrity", dag_run.dag_id)
+            return
+
+        dag_run.dag_hash = latest_version
+
+        # Refresh the DAG
+        dag_run.dag = self.dagbag.get_dag(dag_id=dag_run.dag_id, 
session=session)
+
+        # Verify integrity also takes care of session.flush
+        dag_run.verify_integrity(session=session)
+
+    def _send_dag_callbacks_to_processor(self, dag_run: DagRun):
+        if not self.processor_agent:
+            raise ValueError("Processor agent is not started.")
+
+        dag = dag_run.get_dag()
+        self._manage_slas(dag)
+        if dag_run.callback:
+            self.processor_agent.send_callback_to_execute(dag_run.callback)
+
+    def _manage_slas(self, dag: DAG):

Review comment:
       Is this name still correct? It seems to me that its content changed 
drastically while the name remained without drastic change.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to