ashb commented on a change in pull request #10956:
URL: https://github.com/apache/airflow/pull/10956#discussion_r489284780
##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1707,53 +1343,119 @@ def _run_scheduler_loop(self) -> None:
)
break
- def _validate_and_run_task_instances(self, simple_dag_bag: SimpleDagBag)
-> bool:
- if simple_dag_bag.serialized_dags:
- try:
- self._process_and_execute_tasks(simple_dag_bag)
- except Exception as e: # pylint: disable=broad-except
- self.log.error("Error queuing tasks")
- self.log.exception(e)
- return False
-
- # Call heartbeats
- self.log.debug("Heartbeating the executor")
- self.executor.heartbeat()
-
- self._change_state_for_tasks_failed_to_execute()
-
- # Process events from the executor
- self._process_executor_events(simple_dag_bag)
- return True
-
- def _process_and_execute_tasks(self, simple_dag_bag: SimpleDagBag) -> None:
- # Handle cases where a DAG run state is set (perhaps manually) to
- # a non-running state. Handle task instances that belong to
- # DAG runs in those states
- # If a task instance is up for retry but the corresponding DAG run
- # isn't running, mark the task instance as FAILED so we don't try
- # to re-run it.
- self._change_state_for_tis_without_dagrun(
- simple_dag_bag=simple_dag_bag,
- old_states=[State.UP_FOR_RETRY],
- new_state=State.FAILED
- )
- # If a task instance is scheduled or queued or up for reschedule,
- # but the corresponding DAG run isn't running, set the state to
- # NONE so we don't try to re-run it.
- self._change_state_for_tis_without_dagrun(
- simple_dag_bag=simple_dag_bag,
- old_states=[State.QUEUED,
- State.SCHEDULED,
- State.UP_FOR_RESCHEDULE,
- State.SENSING],
- new_state=State.NONE
- )
- self._execute_task_instances(simple_dag_bag)
+ def _scheduler_loop_critical_section(self, dag_bag, session) -> Union[int,
_NoLockObtained]:
+ """
Review comment:
Yes -- I might also rename this method as a critical section has a
meaning:
> In concurrent programming, concurrent accesses to shared resources can
lead to unexpected or erroneous behavior, so parts of the program where the
shared resource is accessed need to be protected in ways that avoid the
concurrent access. This protected section is the critical section or critical
region
But this entire function isn't critical any longer, just the execute part at
the end.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]