potiuk commented on a change in pull request #10956:
URL: https://github.com/apache/airflow/pull/10956#discussion_r492048248
##########
File path: airflow/jobs/backfill_job.py
##########
@@ -629,6 +629,7 @@ def _per_task_process(task, key, ti, session=None): #
pylint: disable=too-many-
_dag_runs = ti_status.active_runs[:]
for run in _dag_runs:
run.update_state(session=session)
+ session.merge(run)
Review comment:
Can this be a separate PR?
##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1179,39 +832,50 @@ def __get_concurrency_maps(
# pylint: disable=too-many-locals,too-many-statements
@provide_session
- def _find_executable_task_instances(
- self,
- simple_dag_bag: SimpleDagBag,
- session: Session = None
- ) -> List[TI]:
+ def _executable_task_instances_to_queued(self, max_tis: int, session:
Session = None) -> List[TI]:
"""
Finds TIs that are ready for execution with respect to pool limits,
dag concurrency, executor state, and priority.
- :param simple_dag_bag: TaskInstances associated with DAGs in the
- simple_dag_bag will be fetched from the DB and executed
- :type simple_dag_bag: airflow.utils.dag_processing.SimpleDagBag
+ :param max_tis: Maximum number of TIs to queue in this loop.
+ :type max_tis: int
:return: list[airflow.models.TaskInstance]
"""
executable_tis: List[TI] = []
+ # Get the pool settings. We get a lock on the pool rows, treating this
as a "critical section"
+ # Throws an exception if lock cannot be obtained, rather than blocking
+ pools = models.Pool.slots_stats(with_for_update=nowait(session),
session=session)
+
+ # If the pools are full, there is no point doing anything!
+ # If _somehow_ the pool is overfull, don't let the limit go negative -
it breaks SQL
+ pool_slots_free = max(0, sum(map(operator.itemgetter('open'),
pools.values())))
+
+ if pool_slots_free == 0:
+ self.log.debug("All pools are full!")
+ return executable_tis
+
+ max_tis = min(max_tis, pool_slots_free)
+
# Get all task instances associated with scheduled
# DagRuns which are not backfilled, in the given states,
# and the dag is not paused
task_instances_to_examine: List[TI] = (
session
.query(TI)
- .filter(TI.dag_id.in_(simple_dag_bag.dag_ids))
- .outerjoin(
- DR, and_(DR.dag_id == TI.dag_id, DR.execution_date ==
TI.execution_date)
- )
- .filter(or_(DR.run_id.is_(None), DR.run_type !=
DagRunType.BACKFILL_JOB.value))
- .outerjoin(DM, DM.dag_id == TI.dag_id)
- .filter(or_(DM.dag_id.is_(None), not_(DM.is_paused)))
+ .outerjoin(TI.dag_run)
+ .filter(or_(DR.run_id.is_(None),
+ DR.run_type != DagRunType.BACKFILL_JOB.value))
+ .join(TI.dag_model)
+ .filter(not_(DM.is_paused))
.filter(TI.state == State.SCHEDULED)
+ .options(selectinload('dag_model'))
+ .limit(max_tis)
+ .with_for_update(of=TI, **skip_locked(session=session))
Review comment:
Seems that "with_for_update of " is missing in 5.7? Seems that in this
case TI Aand DR tables both will be locked in this case? I guess it is
something that we should also worry about? Seems that in this case we might hit
some limitations on parallelism in 5.7.
##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1052,6 +703,8 @@ def __init__(
self.max_tis_per_query: int = conf.getint('scheduler',
'max_tis_per_query')
self.processor_agent: Optional[DagFileProcessorAgent] = None
+ self.dagbag = DagBag(read_dags_from_db=True)
Review comment:
Should we separate the DagBag (read_dags_from_db=True) out from the
DagBag functionality regarding file reading ? Seems that
DagBag(read_dags_from_db=True) is a pass-trought to retrieve Dag via it's Id in
a lazy way?
##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1705,62 +1305,216 @@ def _run_scheduler_loop(self) -> None:
loop_duration = loop_end_time - loop_start_time
self.log.debug("Ran scheduling loop in %.2f seconds",
loop_duration)
- if not is_unit_test:
+ if not is_unit_test and not num_queued_tis and not
num_finished_events:
+ # If the scheduler is doing things, don't sleep. This means
when there is work to do, the
+ # scheduler will run "as quick as possible", but when it's
stopped, it can sleep, dropping CPU
+ # usage when "idle"
time.sleep(self._processor_poll_interval)
- if self.processor_agent.done:
+ if self.num_runs > 0 and loop_count >= self.num_runs and
self.processor_agent.done:
self.log.info(
- "Exiting scheduler loop as all files have been processed
%d times", self.num_runs
+ "Exiting scheduler loop as requested number of runs (%d -
got to %d) has been reached",
+ self.num_runs, loop_count,
)
break
- def _validate_and_run_task_instances(self, simple_dag_bag: SimpleDagBag)
-> bool:
- if simple_dag_bag.serialized_dags:
+ def _do_scheduling(self, session) -> int:
+ """
+ This function is where the main scheduling decisions take places. It:
+
+ - Creates any necessary DAG runs by examining the
next_dagrun_create_after column of DagModel
+
+ - Finds the "next n oldest" running DAG Runs to examine for scheduling
(n=20 by default) and tries to
+ progress state (TIs to SCHEDULED, or DagRuns to SUCCESS/FAILURE etc)
+
+ By "next oldest", we mean hasn't been examined/scheduled in the most
time.
+
+ - Then, via a Critical Section (locking the rows of the Pool model) we
queue tasks, and then send them
+ to the executor.
+
+ See docs of _critical_section_execute_task_instances for more.
+
+ :return: Number of TIs enqueued in this iteration
+ :rtype: int
+ """
+ try:
+ from sqlalchemy import event
+ expected_commit = False
+
+ # Put a check in place to make sure we don't commit unexpectedly
+ @event.listens_for(session.bind, 'commit')
+ def validate_commit(_):
+ nonlocal expected_commit
+ if expected_commit:
+ expected_commit = False
+ return
+ raise RuntimeError("UNEXPECTED COMMIT - THIS WILL BREAK HA
LOCKS!")
Review comment:
Is this something we plan to keep? We seem to catch Exceptions in
various places - do we want to actually kill scheduler in this case or just log
an error ?
##########
File path: airflow/executors/base_executor.py
##########
@@ -69,16 +70,16 @@ def start(self): # pragma: no cover
"""
def queue_command(self,
- simple_task_instance: SimpleTaskInstance,
Review comment:
Good one!
##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1705,62 +1305,216 @@ def _run_scheduler_loop(self) -> None:
loop_duration = loop_end_time - loop_start_time
self.log.debug("Ran scheduling loop in %.2f seconds",
loop_duration)
- if not is_unit_test:
+ if not is_unit_test and not num_queued_tis and not
num_finished_events:
+ # If the scheduler is doing things, don't sleep. This means
when there is work to do, the
+ # scheduler will run "as quick as possible", but when it's
stopped, it can sleep, dropping CPU
+ # usage when "idle"
time.sleep(self._processor_poll_interval)
- if self.processor_agent.done:
+ if self.num_runs > 0 and loop_count >= self.num_runs and
self.processor_agent.done:
self.log.info(
- "Exiting scheduler loop as all files have been processed
%d times", self.num_runs
+ "Exiting scheduler loop as requested number of runs (%d -
got to %d) has been reached",
+ self.num_runs, loop_count,
)
break
- def _validate_and_run_task_instances(self, simple_dag_bag: SimpleDagBag)
-> bool:
- if simple_dag_bag.serialized_dags:
+ def _do_scheduling(self, session) -> int:
+ """
+ This function is where the main scheduling decisions take places. It:
+
+ - Creates any necessary DAG runs by examining the
next_dagrun_create_after column of DagModel
+
+ - Finds the "next n oldest" running DAG Runs to examine for scheduling
(n=20 by default) and tries to
+ progress state (TIs to SCHEDULED, or DagRuns to SUCCESS/FAILURE etc)
+
+ By "next oldest", we mean hasn't been examined/scheduled in the most
time.
+
+ - Then, via a Critical Section (locking the rows of the Pool model) we
queue tasks, and then send them
+ to the executor.
+
+ See docs of _critical_section_execute_task_instances for more.
+
+ :return: Number of TIs enqueued in this iteration
+ :rtype: int
+ """
+ try:
+ from sqlalchemy import event
+ expected_commit = False
+
+ # Put a check in place to make sure we don't commit unexpectedly
+ @event.listens_for(session.bind, 'commit')
+ def validate_commit(_):
+ nonlocal expected_commit
+ if expected_commit:
+ expected_commit = False
+ return
+ raise RuntimeError("UNEXPECTED COMMIT - THIS WILL BREAK HA
LOCKS!")
Review comment:
Right. The context managers look much better here. I understand what you
are trying to do. Indeed wee'll have to think about some way of protecting it
at static check time rather than at runtime. Since we aren't doing any
magical/dynamic stuff here, maybe we could do some custom pylint check and add
a @no-commit decorator to functions and then we do this check:
- @nocommit methods can only call @nocommit methods
- @nocommit methods cannot have sessions created - they must be provided
- @nocommit method cannot call commit()
- @nocommit method cannot create their own sessions
Is there any other way to issue a commit() - other than auto-closing an
automated session and explicit commit(). If not - that might be much better to
check it at static check rather than compile time.
##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1179,39 +832,50 @@ def __get_concurrency_maps(
# pylint: disable=too-many-locals,too-many-statements
@provide_session
- def _find_executable_task_instances(
- self,
- simple_dag_bag: SimpleDagBag,
- session: Session = None
- ) -> List[TI]:
+ def _executable_task_instances_to_queued(self, max_tis: int, session:
Session = None) -> List[TI]:
"""
Finds TIs that are ready for execution with respect to pool limits,
dag concurrency, executor state, and priority.
- :param simple_dag_bag: TaskInstances associated with DAGs in the
- simple_dag_bag will be fetched from the DB and executed
- :type simple_dag_bag: airflow.utils.dag_processing.SimpleDagBag
+ :param max_tis: Maximum number of TIs to queue in this loop.
+ :type max_tis: int
:return: list[airflow.models.TaskInstance]
"""
executable_tis: List[TI] = []
+ # Get the pool settings. We get a lock on the pool rows, treating this
as a "critical section"
+ # Throws an exception if lock cannot be obtained, rather than blocking
+ pools = models.Pool.slots_stats(with_for_update=nowait(session),
session=session)
+
+ # If the pools are full, there is no point doing anything!
+ # If _somehow_ the pool is overfull, don't let the limit go negative -
it breaks SQL
+ pool_slots_free = max(0, sum(map(operator.itemgetter('open'),
pools.values())))
+
+ if pool_slots_free == 0:
+ self.log.debug("All pools are full!")
+ return executable_tis
+
+ max_tis = min(max_tis, pool_slots_free)
+
# Get all task instances associated with scheduled
# DagRuns which are not backfilled, in the given states,
# and the dag is not paused
task_instances_to_examine: List[TI] = (
session
.query(TI)
- .filter(TI.dag_id.in_(simple_dag_bag.dag_ids))
- .outerjoin(
- DR, and_(DR.dag_id == TI.dag_id, DR.execution_date ==
TI.execution_date)
- )
- .filter(or_(DR.run_id.is_(None), DR.run_type !=
DagRunType.BACKFILL_JOB.value))
- .outerjoin(DM, DM.dag_id == TI.dag_id)
- .filter(or_(DM.dag_id.is_(None), not_(DM.is_paused)))
+ .outerjoin(TI.dag_run)
+ .filter(or_(DR.run_id.is_(None),
+ DR.run_type != DagRunType.BACKFILL_JOB.value))
+ .join(TI.dag_model)
+ .filter(not_(DM.is_paused))
.filter(TI.state == State.SCHEDULED)
+ .options(selectinload('dag_model'))
+ .limit(max_tis)
+ .with_for_update(of=TI, **skip_locked(session=session))
Review comment:
I see. I still have to wrap my head around the locking consequences
here. I understand that here the only locked tables will be TI and DR. the DM
is "selectinload" so likely not locked.
I really just want to understand the consequences and risks. I think a the
end we should come up with some "cautious" recommendations to the users. After
looking at the potential issues and gains, my current feeling (but needs a bit
deeper thinking and testing) is that
* MySQL 5.7 -> go with 1 scheduler (and escape hatch if possible/easy to
implement). Experiment if you'r adventurous, but expect moderate gains and
higher risks of concurrently related problems
* MySQL 8 -> should work fine and bring expected gains, with much lower risk
of concurrency problems.
##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1179,39 +832,50 @@ def __get_concurrency_maps(
# pylint: disable=too-many-locals,too-many-statements
@provide_session
- def _find_executable_task_instances(
- self,
- simple_dag_bag: SimpleDagBag,
- session: Session = None
- ) -> List[TI]:
+ def _executable_task_instances_to_queued(self, max_tis: int, session:
Session = None) -> List[TI]:
"""
Finds TIs that are ready for execution with respect to pool limits,
dag concurrency, executor state, and priority.
- :param simple_dag_bag: TaskInstances associated with DAGs in the
- simple_dag_bag will be fetched from the DB and executed
- :type simple_dag_bag: airflow.utils.dag_processing.SimpleDagBag
+ :param max_tis: Maximum number of TIs to queue in this loop.
+ :type max_tis: int
:return: list[airflow.models.TaskInstance]
"""
executable_tis: List[TI] = []
+ # Get the pool settings. We get a lock on the pool rows, treating this
as a "critical section"
+ # Throws an exception if lock cannot be obtained, rather than blocking
+ pools = models.Pool.slots_stats(with_for_update=nowait(session),
session=session)
+
+ # If the pools are full, there is no point doing anything!
+ # If _somehow_ the pool is overfull, don't let the limit go negative -
it breaks SQL
+ pool_slots_free = max(0, sum(map(operator.itemgetter('open'),
pools.values())))
+
+ if pool_slots_free == 0:
+ self.log.debug("All pools are full!")
+ return executable_tis
+
+ max_tis = min(max_tis, pool_slots_free)
+
# Get all task instances associated with scheduled
# DagRuns which are not backfilled, in the given states,
# and the dag is not paused
task_instances_to_examine: List[TI] = (
session
.query(TI)
- .filter(TI.dag_id.in_(simple_dag_bag.dag_ids))
- .outerjoin(
- DR, and_(DR.dag_id == TI.dag_id, DR.execution_date ==
TI.execution_date)
- )
- .filter(or_(DR.run_id.is_(None), DR.run_type !=
DagRunType.BACKFILL_JOB.value))
- .outerjoin(DM, DM.dag_id == TI.dag_id)
- .filter(or_(DM.dag_id.is_(None), not_(DM.is_paused)))
+ .outerjoin(TI.dag_run)
+ .filter(or_(DR.run_id.is_(None),
+ DR.run_type != DagRunType.BACKFILL_JOB.value))
+ .join(TI.dag_model)
+ .filter(not_(DM.is_paused))
.filter(TI.state == State.SCHEDULED)
+ .options(selectinload('dag_model'))
+ .limit(max_tis)
+ .with_for_update(of=TI, **skip_locked(session=session))
Review comment:
I see. I still have to wrap my head around the locking consequences
here. I understand that here the only locked tables will be TI and DR. the DM
is "selectinload" so likely not locked.
I really just want to understand the consequences and risks. I think a the
end we should come up with some "cautious" recommendations to the users. After
looking at the potential issues and gains, my current feeling (but needs a bit
deeper thinking and testing) is that
* MySQL 5.7 -> go with 1 scheduler (and escape hatch if possible/easy to
implement). Experiment if you're adventurous, but expect moderate gains and
higher risks of concurrently related problems
* MySQL 8 -> should work fine and bring expected gains, with much lower risk
of concurrency problems.
##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1179,39 +832,50 @@ def __get_concurrency_maps(
# pylint: disable=too-many-locals,too-many-statements
@provide_session
- def _find_executable_task_instances(
- self,
- simple_dag_bag: SimpleDagBag,
- session: Session = None
- ) -> List[TI]:
+ def _executable_task_instances_to_queued(self, max_tis: int, session:
Session = None) -> List[TI]:
"""
Finds TIs that are ready for execution with respect to pool limits,
dag concurrency, executor state, and priority.
- :param simple_dag_bag: TaskInstances associated with DAGs in the
- simple_dag_bag will be fetched from the DB and executed
- :type simple_dag_bag: airflow.utils.dag_processing.SimpleDagBag
+ :param max_tis: Maximum number of TIs to queue in this loop.
+ :type max_tis: int
:return: list[airflow.models.TaskInstance]
"""
executable_tis: List[TI] = []
+ # Get the pool settings. We get a lock on the pool rows, treating this
as a "critical section"
+ # Throws an exception if lock cannot be obtained, rather than blocking
+ pools = models.Pool.slots_stats(with_for_update=nowait(session),
session=session)
+
+ # If the pools are full, there is no point doing anything!
+ # If _somehow_ the pool is overfull, don't let the limit go negative -
it breaks SQL
+ pool_slots_free = max(0, sum(map(operator.itemgetter('open'),
pools.values())))
+
+ if pool_slots_free == 0:
+ self.log.debug("All pools are full!")
+ return executable_tis
+
+ max_tis = min(max_tis, pool_slots_free)
+
# Get all task instances associated with scheduled
# DagRuns which are not backfilled, in the given states,
# and the dag is not paused
task_instances_to_examine: List[TI] = (
session
.query(TI)
- .filter(TI.dag_id.in_(simple_dag_bag.dag_ids))
- .outerjoin(
- DR, and_(DR.dag_id == TI.dag_id, DR.execution_date ==
TI.execution_date)
- )
- .filter(or_(DR.run_id.is_(None), DR.run_type !=
DagRunType.BACKFILL_JOB.value))
- .outerjoin(DM, DM.dag_id == TI.dag_id)
- .filter(or_(DM.dag_id.is_(None), not_(DM.is_paused)))
+ .outerjoin(TI.dag_run)
+ .filter(or_(DR.run_id.is_(None),
+ DR.run_type != DagRunType.BACKFILL_JOB.value))
+ .join(TI.dag_model)
+ .filter(not_(DM.is_paused))
.filter(TI.state == State.SCHEDULED)
+ .options(selectinload('dag_model'))
+ .limit(max_tis)
+ .with_for_update(of=TI, **skip_locked(session=session))
Review comment:
I see. I still have to wrap my head around the locking consequences
here. I understand that here the only locked tables will be TI and DR. the DM
is "selectinload" so likely not locked.
I really just want to understand the consequences and risks. I think a the
end we should come up with some "cautious" recommendations to the users. After
looking at the potential issues and gains, my current feeling (but needs a bit
deeper thinking and testing) is that
* MySQL 5.7 -> go with 1 scheduler (and escape hatch if possible/easy to
implement). Experiment if you're adventurous, but expect moderate gains and
higher risks of concurrency related problems
* MySQL 8 -> should work fine and bring expected gains, with much lower risk
of concurrency problems.
##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1052,6 +703,8 @@ def __init__(
self.max_tis_per_query: int = conf.getint('scheduler',
'max_tis_per_query')
self.processor_agent: Optional[DagFileProcessorAgent] = None
+ self.dagbag = DagBag(read_dags_from_db=True)
Review comment:
Or even `DagDbProxy` - it even can be used by DagBag by composition when
needed?
Seems That DagBag(read_dags_from_db=False) is really used for this: `A
dagbag is a collection of dags, parsed out of a folder tree` (from the
docstring). Then "DagBag(read_dags_from_db=True) is rather used as a proxy to
retrieve Dags from teh DB only `get me the DAG with this id from the DB no
matter in which folder it is`.
Sounds like DagBag could use the DagDbProxy when needed and all the places
where we create DagBag(read_dags_from_db=True) we could use directly the proxy.
That seems to better fit the single-responsibility principle.
##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1179,39 +832,50 @@ def __get_concurrency_maps(
# pylint: disable=too-many-locals,too-many-statements
@provide_session
- def _find_executable_task_instances(
- self,
- simple_dag_bag: SimpleDagBag,
- session: Session = None
- ) -> List[TI]:
+ def _executable_task_instances_to_queued(self, max_tis: int, session:
Session = None) -> List[TI]:
"""
Finds TIs that are ready for execution with respect to pool limits,
dag concurrency, executor state, and priority.
- :param simple_dag_bag: TaskInstances associated with DAGs in the
- simple_dag_bag will be fetched from the DB and executed
- :type simple_dag_bag: airflow.utils.dag_processing.SimpleDagBag
+ :param max_tis: Maximum number of TIs to queue in this loop.
+ :type max_tis: int
:return: list[airflow.models.TaskInstance]
"""
executable_tis: List[TI] = []
+ # Get the pool settings. We get a lock on the pool rows, treating this
as a "critical section"
+ # Throws an exception if lock cannot be obtained, rather than blocking
+ pools = models.Pool.slots_stats(with_for_update=nowait(session),
session=session)
+
+ # If the pools are full, there is no point doing anything!
+ # If _somehow_ the pool is overfull, don't let the limit go negative -
it breaks SQL
+ pool_slots_free = max(0, sum(map(operator.itemgetter('open'),
pools.values())))
+
+ if pool_slots_free == 0:
+ self.log.debug("All pools are full!")
+ return executable_tis
+
+ max_tis = min(max_tis, pool_slots_free)
+
# Get all task instances associated with scheduled
# DagRuns which are not backfilled, in the given states,
# and the dag is not paused
task_instances_to_examine: List[TI] = (
session
.query(TI)
- .filter(TI.dag_id.in_(simple_dag_bag.dag_ids))
- .outerjoin(
- DR, and_(DR.dag_id == TI.dag_id, DR.execution_date ==
TI.execution_date)
- )
- .filter(or_(DR.run_id.is_(None), DR.run_type !=
DagRunType.BACKFILL_JOB.value))
- .outerjoin(DM, DM.dag_id == TI.dag_id)
- .filter(or_(DM.dag_id.is_(None), not_(DM.is_paused)))
+ .outerjoin(TI.dag_run)
+ .filter(or_(DR.run_id.is_(None),
Review comment:
Yeah. We can remove it later in separate commit.
##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1705,62 +1305,216 @@ def _run_scheduler_loop(self) -> None:
loop_duration = loop_end_time - loop_start_time
self.log.debug("Ran scheduling loop in %.2f seconds",
loop_duration)
- if not is_unit_test:
+ if not is_unit_test and not num_queued_tis and not
num_finished_events:
+ # If the scheduler is doing things, don't sleep. This means
when there is work to do, the
+ # scheduler will run "as quick as possible", but when it's
stopped, it can sleep, dropping CPU
+ # usage when "idle"
time.sleep(self._processor_poll_interval)
- if self.processor_agent.done:
+ if self.num_runs > 0 and loop_count >= self.num_runs and
self.processor_agent.done:
self.log.info(
- "Exiting scheduler loop as all files have been processed
%d times", self.num_runs
+ "Exiting scheduler loop as requested number of runs (%d -
got to %d) has been reached",
+ self.num_runs, loop_count,
)
break
- def _validate_and_run_task_instances(self, simple_dag_bag: SimpleDagBag)
-> bool:
- if simple_dag_bag.serialized_dags:
+ def _do_scheduling(self, session) -> int:
+ """
+ This function is where the main scheduling decisions take places. It:
+
+ - Creates any necessary DAG runs by examining the
next_dagrun_create_after column of DagModel
+
+ - Finds the "next n oldest" running DAG Runs to examine for scheduling
(n=20 by default) and tries to
+ progress state (TIs to SCHEDULED, or DagRuns to SUCCESS/FAILURE etc)
+
+ By "next oldest", we mean hasn't been examined/scheduled in the most
time.
+
+ - Then, via a Critical Section (locking the rows of the Pool model) we
queue tasks, and then send them
+ to the executor.
+
+ See docs of _critical_section_execute_task_instances for more.
+
+ :return: Number of TIs enqueued in this iteration
+ :rtype: int
+ """
+ try:
+ from sqlalchemy import event
+ expected_commit = False
+
+ # Put a check in place to make sure we don't commit unexpectedly
+ @event.listens_for(session.bind, 'commit')
+ def validate_commit(_):
+ nonlocal expected_commit
+ if expected_commit:
+ expected_commit = False
+ return
+ raise RuntimeError("UNEXPECTED COMMIT - THIS WILL BREAK HA
LOCKS!")
+
+ query = DagModel.dags_needing_dagruns(session)
+ for dag_model in query:
+ dag = self.dagbag.get_dag(dag_model.dag_id, session=session)
+ self._create_dag_run(dag_model, dag, session)
+
+ # commit the session - Release the write lock on DagModel table.
+ expected_commit = True
+ session.commit()
+ # END: create dagruns
+
+ for dag_run in DagRun.next_dagruns_to_examine(session):
+ self._schedule_dag_run(dag_run, session)
+
+ expected_commit = True
+ session.commit()
+
+ # Without this, the session has an invalid view of the DB
+ session.expunge_all()
+ # END: schedule TIs
+
+ # TODO[HA]: Do we need to call
+ # _change_state_for_tis_without_dagrun (2x) that we were before
+ # to tidy up manually tweaked TIs. Do we need to do it every
+ # time?
+
try:
- self._process_and_execute_tasks(simple_dag_bag)
- except Exception as e: # pylint: disable=broad-except
- self.log.error("Error queuing tasks")
- self.log.exception(e)
- return False
-
- # Call heartbeats
- self.log.debug("Heartbeating the executor")
- self.executor.heartbeat()
-
- self._change_state_for_tasks_failed_to_execute()
-
- # Process events from the executor
- self._process_executor_events(simple_dag_bag)
- return True
-
- def _process_and_execute_tasks(self, simple_dag_bag: SimpleDagBag) -> None:
- # Handle cases where a DAG run state is set (perhaps manually) to
- # a non-running state. Handle task instances that belong to
- # DAG runs in those states
- # If a task instance is up for retry but the corresponding DAG run
- # isn't running, mark the task instance as FAILED so we don't try
- # to re-run it.
- self._change_state_for_tis_without_dagrun(
- simple_dag_bag=simple_dag_bag,
- old_states=[State.UP_FOR_RETRY],
- new_state=State.FAILED
- )
- # If a task instance is scheduled or queued or up for reschedule,
- # but the corresponding DAG run isn't running, set the state to
- # NONE so we don't try to re-run it.
- self._change_state_for_tis_without_dagrun(
- simple_dag_bag=simple_dag_bag,
- old_states=[State.QUEUED,
- State.SCHEDULED,
- State.UP_FOR_RESCHEDULE,
- State.SENSING],
- new_state=State.NONE
+ if self.executor.slots_available <= 0:
+ # We know we can't do anything here, so don't even try!
+ self.log.debug("Executor full, skipping critical section")
+ return 0
+
+ timer = Stats.timer('scheduler.critical_section_duration')
+ timer.start()
+
+ # Find anything TIs in state SCHEDULED, try to QUEUE it (send
it to the executor)
+ num_queued_tis =
self._critical_section_execute_task_instances(session=session)
+
+ # Make sure we only sent this metric if we obtained the lock,
otherwise we'll skew the
+ # metric, way down
+ timer.stop(send=True)
+ except OperationalError as e:
+ timer.stop(send=False)
+
+ # DB specific error codes:
+ # Postgres: 55P03
+ # MySQL: 3572, 'Statement aborted because lock(s) could not be
acquired immediately and NOWAIT
+ # is set.'
+ # MySQL: 1205, 'Lock wait timeout exceeded; try restarting
transaction
+ # (when NOWAIT isn't available)
+ db_err_code = getattr(e.orig, 'pgcode', None) or e.orig.args[0]
+
+ # We could test if e.orig is an instance of
+ #
psycopg2.errors.LockNotAvailable/_mysql_exceptions.OperationalError, but that
involves
+ # importing it. This doesn't
+ if db_err_code in ('55P03', 1205, 3572):
+ self.log.debug("Critical section lock held by another
Scheduler")
+ Stats.incr('scheduler.critical_section_busy')
+ return 0
+ raise
+
+ return num_queued_tis
+ finally:
+ event.remove(session.bind, 'commit', validate_commit)
+
+ def _create_dag_run(self, dag_model: DagModel, dag: DAG, session: Session)
-> None:
+ """
+ Unconditionally create a DAG run for the given DAG, and update the
dag_model's fields to control
+ if/when the next DAGRun should be created
+ """
+ next_run_date = dag_model.next_dagrun
+ dag.create_dagrun(
+ run_type=DagRunType.SCHEDULED,
+ execution_date=next_run_date,
+ start_date=timezone.utcnow(),
+ state=State.RUNNING,
+ external_trigger=False,
+ session=session
)
- self._execute_task_instances(simple_dag_bag)
+
+ # Check max_active_runs, to see if we are _now_ at the limit for this
dag? (we've just created
+ # one after all)
+ active_runs_of_dag = session.query(func.count('*')).filter(
+ DagRun.dag_id == dag_model.dag_id,
+ DagRun.state == State.RUNNING, # pylint:
disable=comparison-with-callable
+ DagRun.external_trigger.is_(False),
+ ).scalar()
+
+ # TODO[HA]: add back in dagrun.timeout
+
+ if dag.max_active_runs and active_runs_of_dag >= dag.max_active_runs:
+ self.log.info(
+ "DAG %s is at (or above) max_active_runs (%d of %d), not
creating any more runs",
+ dag.dag_id, active_runs_of_dag, dag.max_active_runs
+ )
+ dag_model.next_dagrun = None
+ dag_model.next_dagrun_create_after = None
+ else:
+ next_dagrun_info = dag.next_dagrun_info(next_run_date)
+ if next_dagrun_info:
+ dag_model.next_dagrun = next_dagrun_info['execution_date']
+ dag_model.next_dagrun_create_after =
next_dagrun_info['can_be_created_after']
+ else:
+ dag_model.next_dagrun = None
+ dag_model.next_dagrun_create_after = None
+
+ # TODO[HA]: Should we do a session.flush() so we don't have to keep
lots of state/object in
+ # memory for larger dags? or expunge_all()
+
+ def _schedule_dag_run(self, dag_run: DagRun, session: Session) -> int:
+ """
+ Make scheduling decisions about an individual dag run
+
+ :return: Number of tasks scheduled
+ """
+ dag_run.dag = self.dagbag.get_dag(dag_run.dag_id, session=session)
+
+ if not dag_run.dag:
+ self.log.error(
+ "Couldn't find dag %s in DagBag/DB!", dag_run.dag_id
+ )
+ return 0
+
+ if dag_run.execution_date > timezone.utcnow() and not
dag_run.dag.allow_future_exec_dates:
+ self.log.error(
+ "Execution date is in future: %s",
+ dag_run.execution_date
+ )
+ return 0
+
+ # TODO: This query is probably horribly inefficient (though there is an
+ # index on (dag_id,state)). It is to deal with the case when a user
+ # clears more than max_active_runs older tasks -- we don't want the
+ # scheduler to suddenly go and start running tasks from all of the
+ # runs. (AIRFLOW-137/GH #1442)
+ #
+ # The longer term fix would be to have `clear` do this, and put DagRuns
+ # in to the queued state, then take DRs out of queued before creating
+ # any new ones
+ if dag_run.dag.max_active_runs:
+ currently_active_runs =
session.query(func.count(TI.execution_date.distinct())).filter(
+ TI.dag_id == dag_run.dag_id,
+ TI.state.notin_(State.finished())
+ ).scalar()
+
+ if currently_active_runs >= dag_run.dag.max_active_runs:
+ return 0
+
+ # TODO[HA]: Run verify_integrity, but only if the serialized_dag has
changed
+
+ # TODO[HA]: Rename update_state -> schedule_dag_run, ?? something else?
+ schedulable_tis = dag_run.update_state(session=session)
Review comment:
Very interesting discussion. I did not have time to pursue it in detail
but I will keep an eye on this and see if I can dig a bit deeper here .
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]