This is an automated email from the ASF dual-hosted git repository.
onikolas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 07c40bd78a Support failing tasks stuck in queued for hybrid executors
(#39624)
07c40bd78a is described below
commit 07c40bd78a05bb4e1e8ee03d885006ac7a44e21d
Author: Niko Oliveira <[email protected]>
AuthorDate: Wed May 15 13:34:17 2024 -0700
Support failing tasks stuck in queued for hybrid executors (#39624)
Sort the set of tasks that are up for failing for being queued too long
and send them to the appropriate executor for cleanup.
---
airflow/jobs/scheduler_job_runner.py | 30 ++++++++++++++---------------
tests/jobs/test_scheduler_job.py | 37 ++++++++++++++++++++++++++----------
2 files changed, 41 insertions(+), 26 deletions(-)
diff --git a/airflow/jobs/scheduler_job_runner.py
b/airflow/jobs/scheduler_job_runner.py
index f2333e8d5a..3d81aaeb56 100644
--- a/airflow/jobs/scheduler_job_runner.py
+++ b/airflow/jobs/scheduler_job_runner.py
@@ -1566,22 +1566,20 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
TI.queued_by_job_id == self.job.id,
)
).all()
- try:
- cleaned_up_task_instances =
self.job.executor.cleanup_stuck_queued_tasks(
- tis=tasks_stuck_in_queued
- )
- cleaned_up_task_instances = set(cleaned_up_task_instances)
- for ti in tasks_stuck_in_queued:
- if repr(ti) in cleaned_up_task_instances:
- self._task_context_logger.warning(
- "Marking task instance %s stuck in queued as failed. "
- "If the task instance has available retries, it will
be retried.",
- ti,
- ti=ti,
- )
- except NotImplementedError:
- self.log.debug("Executor doesn't support cleanup of stuck queued
tasks. Skipping.")
- ...
+
+ for executor, stuck_tis in
self._executor_to_tis(tasks_stuck_in_queued).items():
+ try:
+ cleaned_up_task_instances =
set(executor.cleanup_stuck_queued_tasks(tis=stuck_tis))
+ for ti in stuck_tis:
+ if repr(ti) in cleaned_up_task_instances:
+ self._task_context_logger.warning(
+ "Marking task instance %s stuck in queued as
failed. "
+ "If the task instance has available retries, it
will be retried.",
+ ti,
+ ti=ti,
+ )
+ except NotImplementedError:
+ self.log.debug("Executor doesn't support cleanup of stuck
queued tasks. Skipping.")
@provide_session
def _emit_pool_metrics(self, session: Session = NEW_SESSION) -> None:
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 85399892ac..9fcdba5ac5 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -1809,24 +1809,41 @@ class TestSchedulerJob:
# Second executor called for ti3
mock_executors[1].try_adopt_task_instances.assert_called_once_with([ti3])
- def test_fail_stuck_queued_tasks(self, dag_maker, session):
- with dag_maker("test_fail_stuck_queued_tasks"):
+ def test_fail_stuck_queued_tasks(self, dag_maker, session, mock_executors):
+ with dag_maker("test_fail_stuck_queued_tasks_multiple_executors"):
op1 = EmptyOperator(task_id="op1")
+ op2 = EmptyOperator(task_id="op2", executor="default_exec")
+ op3 = EmptyOperator(task_id="op3", executor="secondary_exec")
dr = dag_maker.create_dagrun()
- ti = dr.get_task_instance(task_id=op1.task_id, session=session)
- ti.state = State.QUEUED
- ti.queued_dttm = timezone.utcnow() - timedelta(minutes=15)
+ ti1 = dr.get_task_instance(task_id=op1.task_id, session=session)
+ ti2 = dr.get_task_instance(task_id=op2.task_id, session=session)
+ ti3 = dr.get_task_instance(task_id=op3.task_id, session=session)
+ for ti in [ti1, ti2, ti3]:
+ ti.state = State.QUEUED
+ ti.queued_dttm = timezone.utcnow() - timedelta(minutes=15)
session.commit()
- executor = MagicMock()
- executor.cleanup_stuck_queued_tasks = mock.MagicMock()
- scheduler_job = Job(executor=executor)
+ scheduler_job = Job()
job_runner = SchedulerJobRunner(job=scheduler_job, num_runs=0)
job_runner._task_queued_timeout = 300
- job_runner._fail_tasks_stuck_in_queued()
+ with
mock.patch("airflow.executors.executor_loader.ExecutorLoader.load_executor") as
loader_mock:
+ # The executors are mocked, so cannot be loaded/imported. Mock
load_executor and return the
+ # correct object for the given input executor name.
+ loader_mock.side_effect = lambda *x: {
+ ("default_exec",): mock_executors[0],
+ (None,): mock_executors[0],
+ ("secondary_exec",): mock_executors[1],
+ }[x]
+ job_runner._fail_tasks_stuck_in_queued()
- job_runner.job.executor.cleanup_stuck_queued_tasks.assert_called_once()
+ # Default executor is called for ti1 (no explicit executor override
uses default) and ti2 (where we
+ # explicitly marked that for execution by the default executor)
+ try:
+
mock_executors[0].cleanup_stuck_queued_tasks.assert_called_once_with(tis=[ti1,
ti2])
+ except AssertionError:
+
mock_executors[0].cleanup_stuck_queued_tasks.assert_called_once_with(tis=[ti2,
ti1])
+
mock_executors[1].cleanup_stuck_queued_tasks.assert_called_once_with(tis=[ti3])
def test_fail_stuck_queued_tasks_raises_not_implemented(self, dag_maker,
session, caplog):
with dag_maker("test_fail_stuck_queued_tasks"):