turbaszek commented on a change in pull request #10956:
URL: https://github.com/apache/airflow/pull/10956#discussion_r488904165
##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1707,53 +1343,119 @@ def _run_scheduler_loop(self) -> None:
)
break
- def _validate_and_run_task_instances(self, simple_dag_bag: SimpleDagBag)
-> bool:
- if simple_dag_bag.serialized_dags:
- try:
- self._process_and_execute_tasks(simple_dag_bag)
- except Exception as e: # pylint: disable=broad-except
- self.log.error("Error queuing tasks")
- self.log.exception(e)
- return False
-
- # Call heartbeats
- self.log.debug("Heartbeating the executor")
- self.executor.heartbeat()
-
- self._change_state_for_tasks_failed_to_execute()
-
- # Process events from the executor
- self._process_executor_events(simple_dag_bag)
- return True
-
- def _process_and_execute_tasks(self, simple_dag_bag: SimpleDagBag) -> None:
- # Handle cases where a DAG run state is set (perhaps manually) to
- # a non-running state. Handle task instances that belong to
- # DAG runs in those states
- # If a task instance is up for retry but the corresponding DAG run
- # isn't running, mark the task instance as FAILED so we don't try
- # to re-run it.
- self._change_state_for_tis_without_dagrun(
- simple_dag_bag=simple_dag_bag,
- old_states=[State.UP_FOR_RETRY],
- new_state=State.FAILED
- )
- # If a task instance is scheduled or queued or up for reschedule,
- # but the corresponding DAG run isn't running, set the state to
- # NONE so we don't try to re-run it.
- self._change_state_for_tis_without_dagrun(
- simple_dag_bag=simple_dag_bag,
- old_states=[State.QUEUED,
- State.SCHEDULED,
- State.UP_FOR_RESCHEDULE,
- State.SENSING],
- new_state=State.NONE
- )
- self._execute_task_instances(simple_dag_bag)
+ def _scheduler_loop_critical_section(self, dag_bag, session) -> Union[int,
_NoLockObtained]:
+ """
Review comment:
Can you please add a description so people in the future will be able to
understand the purpose of this method?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]