This is an automated email from the ASF dual-hosted git repository.
jscheffl pushed a commit to branch v2-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v2-10-test by this push:
new 341d36d2f16 [v2-10-test] Re-queue tassk when they are stuck in queued
(#43520) (#44158)
341d36d2f16 is described below
commit 341d36d2f161ae450ceea70d8a58e16777d9099d
Author: Jens Scheffler <[email protected]>
AuthorDate: Tue Nov 19 21:06:58 2024 +0100
[v2-10-test] Re-queue tassk when they are stuck in queued (#43520) (#44158)
* [v2-10-test] Re-queue tassk when they are stuck in queued (#43520)
The old "stuck in queued" logic just failed the tasks. Now we requeue
them. We accomplish this by revoking the task from executor and setting state
to scheduled. We'll re-queue it up to 2 times. Number of times is
configurable by hidden config.
We added a method to base executor revoke_task because, it's a discrete
operation that is required for this feature, and it might be useful in other
cases e.g. when detecting as zombies etc. We set state to failed or scheduled
directly from scheduler (rather than sending through the event buffer) because
event buffer makes more sense for handling external events -- why round trip
through the executor and back to scheduler when scheduler is initiating the
action? Anyway this avoids h [...]
---------
(cherry picked from commit a41feeb5aedad842be2b0f954e0be30c767dbc5e)
Co-authored-by: Daniel Imberman <[email protected]>
Co-authored-by: Daniel Standish
<[email protected]>
Co-authored-by: Jed Cunningham
<[email protected]>
* fix test_handle_stuck_queued_tasks_multiple_attempts (#44093)
---------
Co-authored-by: Daniel Imberman <[email protected]>
Co-authored-by: Daniel Standish
<[email protected]>
Co-authored-by: Jed Cunningham
<[email protected]>
Co-authored-by: GPK <[email protected]>
---
airflow/executors/base_executor.py | 26 +++++-
airflow/jobs/scheduler_job_runner.py | 161 ++++++++++++++++++++++++++++-------
docs/spelling_wordlist.txt | 1 +
tests/jobs/test_scheduler_job.py | 128 +++++++++++++++++++++++++---
4 files changed, 270 insertions(+), 46 deletions(-)
diff --git a/airflow/executors/base_executor.py
b/airflow/executors/base_executor.py
index 57568af1997..5a5cf2d73f1 100644
--- a/airflow/executors/base_executor.py
+++ b/airflow/executors/base_executor.py
@@ -26,6 +26,7 @@ from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any, List, Optional, Sequence, Tuple
import pendulum
+from deprecated import deprecated
from airflow.cli.cli_config import DefaultHelpParser
from airflow.configuration import conf
@@ -545,7 +546,12 @@ class BaseExecutor(LoggingMixin):
"""Get called when the daemon receives a SIGTERM."""
raise NotImplementedError()
- def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) ->
list[str]: # pragma: no cover
+ @deprecated(
+ reason="Replaced by function `revoke_task`.",
+ category=RemovedInAirflow3Warning,
+ action="ignore",
+ )
+ def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]:
"""
Handle remnants of tasks that were failed because they were stuck in
queued.
@@ -556,7 +562,23 @@ class BaseExecutor(LoggingMixin):
:param tis: List of Task Instances to clean up
:return: List of readable task instances for a warning message
"""
- raise NotImplementedError()
+ raise NotImplementedError
+
+ def revoke_task(self, *, ti: TaskInstance):
+ """
+ Attempt to remove task from executor.
+
+ It should attempt to ensure that the task is no longer running on the
worker,
+ and ensure that it is cleared out from internal data structures.
+
+ It should *not* change the state of the task in airflow, or add any
events
+ to the event buffer.
+
+ It should not raise any error.
+
+ :param ti: Task instance to remove
+ """
+ raise NotImplementedError
def try_adopt_task_instances(self, tis: Sequence[TaskInstance]) ->
Sequence[TaskInstance]:
"""
diff --git a/airflow/jobs/scheduler_job_runner.py
b/airflow/jobs/scheduler_job_runner.py
index aa4e8d4f26a..c9afd40f719 100644
--- a/airflow/jobs/scheduler_job_runner.py
+++ b/airflow/jobs/scheduler_job_runner.py
@@ -25,12 +25,14 @@ import sys
import time
import warnings
from collections import Counter, defaultdict, deque
+from contextlib import suppress
from dataclasses import dataclass
from datetime import timedelta
from functools import lru_cache, partial
from pathlib import Path
from typing import TYPE_CHECKING, Any, Callable, Collection, Iterable, Iterator
+from deprecated import deprecated
from sqlalchemy import and_, delete, func, not_, or_, select, text, update
from sqlalchemy.exc import OperationalError
from sqlalchemy.orm import lazyload, load_only, make_transient, selectinload
@@ -97,6 +99,9 @@ TI = TaskInstance
DR = DagRun
DM = DagModel
+TASK_STUCK_IN_QUEUED_RESCHEDULE_EVENT = "stuck in queued reschedule"
+""":meta private:"""
+
@dataclass
class ConcurrencyMap:
@@ -228,6 +233,13 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
stalled_task_timeout, task_adoption_timeout,
worker_pods_pending_timeout, task_queued_timeout
)
+ # this param is intentionally undocumented
+ self._num_stuck_queued_retries = conf.getint(
+ section="scheduler",
+ key="num_stuck_in_queued_retries",
+ fallback=2,
+ )
+
self.do_pickle = do_pickle
if log:
@@ -1093,7 +1105,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
timers.call_regular_interval(
conf.getfloat("scheduler", "task_queued_timeout_check_interval"),
- self._fail_tasks_stuck_in_queued,
+ self._handle_tasks_stuck_in_queued,
)
timers.call_regular_interval(
@@ -1141,6 +1153,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
for executor in self.job.executors:
try:
# this is backcompat check if executor does not
inherit from BaseExecutor
+ # todo: remove in airflow 3.0
if not hasattr(executor, "_task_event_logs"):
continue
with create_session() as session:
@@ -1772,48 +1785,132 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
self.job.executor.send_callback(request)
@provide_session
- def _fail_tasks_stuck_in_queued(self, session: Session = NEW_SESSION) ->
None:
+ def _handle_tasks_stuck_in_queued(self, session: Session = NEW_SESSION) ->
None:
"""
- Mark tasks stuck in queued for longer than `task_queued_timeout` as
failed.
+ Handle the scenario where a task is queued for longer than
`task_queued_timeout`.
Tasks can get stuck in queued for a wide variety of reasons (e.g.
celery loses
track of a task, a cluster can't further scale up its workers, etc.),
but tasks
- should not be stuck in queued for a long time. This will mark tasks
stuck in
- queued for longer than `self._task_queued_timeout` as failed. If the
task has
- available retries, it will be retried.
+ should not be stuck in queued for a long time.
+
+ We will attempt to requeue the task (by revoking it from executor and
setting to
+ scheduled) up to 2 times before failing the task.
"""
- self.log.debug("Calling SchedulerJob._fail_tasks_stuck_in_queued
method")
+ tasks_stuck_in_queued = self._get_tis_stuck_in_queued(session)
+ for executor, stuck_tis in
self._executor_to_tis(tasks_stuck_in_queued).items():
+ try:
+ for ti in stuck_tis:
+ executor.revoke_task(ti=ti)
+ self._maybe_requeue_stuck_ti(
+ ti=ti,
+ session=session,
+ )
+ except NotImplementedError:
+ # this block only gets entered if the executor has not
implemented `revoke_task`.
+ # in which case, we try the fallback logic
+ # todo: remove the call to _stuck_in_queued_backcompat_logic
in airflow 3.0.
+ # after 3.0, `cleanup_stuck_queued_tasks` will be removed,
so we should
+ # just continue immediately.
+ self._stuck_in_queued_backcompat_logic(executor, stuck_tis)
+ continue
- tasks_stuck_in_queued = session.scalars(
+ def _get_tis_stuck_in_queued(self, session) -> Iterable[TaskInstance]:
+ """Query db for TIs that are stuck in queued."""
+ return session.scalars(
select(TI).where(
TI.state == TaskInstanceState.QUEUED,
TI.queued_dttm < (timezone.utcnow() -
timedelta(seconds=self._task_queued_timeout)),
TI.queued_by_job_id == self.job.id,
)
- ).all()
+ )
- for executor, stuck_tis in
self._executor_to_tis(tasks_stuck_in_queued).items():
- try:
- cleaned_up_task_instances =
set(executor.cleanup_stuck_queued_tasks(tis=stuck_tis))
- for ti in stuck_tis:
- if repr(ti) in cleaned_up_task_instances:
- self.log.warning(
- "Marking task instance %s stuck in queued as
failed. "
- "If the task instance has available retries, it
will be retried.",
- ti,
- )
- session.add(
- Log(
- event="stuck in queued",
- task_instance=ti.key,
- extra=(
- "Task will be marked as failed. If the
task instance has "
- "available retries, it will be retried."
- ),
- )
- )
- except NotImplementedError:
- self.log.debug("Executor doesn't support cleanup of stuck
queued tasks. Skipping.")
+ def _maybe_requeue_stuck_ti(self, *, ti, session):
+ """
+ Requeue task if it has not been attempted too many times.
+
+ Otherwise, fail it.
+ """
+ num_times_stuck = self._get_num_times_stuck_in_queued(ti, session)
+ if num_times_stuck < self._num_stuck_queued_retries:
+ self.log.info("Task stuck in queued; will try to requeue.
task_id=%s", ti.task_id)
+ session.add(
+ Log(
+ event=TASK_STUCK_IN_QUEUED_RESCHEDULE_EVENT,
+ task_instance=ti.key,
+ extra=(
+ f"Task was in queued state for longer than
{self._task_queued_timeout} "
+ "seconds; task state will be set back to scheduled."
+ ),
+ )
+ )
+ self._reschedule_stuck_task(ti)
+ else:
+ self.log.info(
+ "Task requeue attempts exceeded max; marking failed.
task_instance=%s",
+ ti,
+ )
+ session.add(
+ Log(
+ event="stuck in queued tries exceeded",
+ task_instance=ti.key,
+ extra=f"Task was requeued more than
{self._num_stuck_queued_retries} times and will be failed.",
+ )
+ )
+ ti.set_state(TaskInstanceState.FAILED, session=session)
+
+ @deprecated(
+ reason="This is backcompat layer for older executor interface. Should
be removed in 3.0",
+ category=RemovedInAirflow3Warning,
+ action="ignore",
+ )
+ def _stuck_in_queued_backcompat_logic(self, executor, stuck_tis):
+ """
+ Try to invoke stuck in queued cleanup for older executor interface.
+
+ TODO: remove in airflow 3.0
+
+ Here we handle case where the executor pre-dates the interface change
that
+ introduced `cleanup_tasks_stuck_in_queued` and deprecated
`cleanup_stuck_queued_tasks`.
+
+ """
+ with suppress(NotImplementedError):
+ for ti_repr in executor.cleanup_stuck_queued_tasks(tis=stuck_tis):
+ self.log.warning(
+ "Task instance %s stuck in queued. Will be set to failed.",
+ ti_repr,
+ )
+
+ @provide_session
+ def _reschedule_stuck_task(self, ti, session=NEW_SESSION):
+ session.execute(
+ update(TI)
+ .where(TI.filter_for_tis([ti]))
+ .values(
+ state=TaskInstanceState.SCHEDULED,
+ queued_dttm=None,
+ )
+ .execution_options(synchronize_session=False)
+ )
+
+ @provide_session
+ def _get_num_times_stuck_in_queued(self, ti: TaskInstance, session:
Session = NEW_SESSION) -> int:
+ """
+ Check the Log table to see how many times a taskinstance has been
stuck in queued.
+
+ We can then use this information to determine whether to reschedule a
task or fail it.
+ """
+ return (
+ session.query(Log)
+ .where(
+ Log.task_id == ti.task_id,
+ Log.dag_id == ti.dag_id,
+ Log.run_id == ti.run_id,
+ Log.map_index == ti.map_index,
+ Log.try_number == ti.try_number,
+ Log.event == TASK_STUCK_IN_QUEUED_RESCHEDULE_EVENT,
+ )
+ .count()
+ )
@provide_session
def _emit_pool_metrics(self, session: Session = NEW_SESSION) -> None:
@@ -2102,7 +2199,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
updated_count = sum(self._set_orphaned(dataset) for dataset in
orphaned_dataset_query)
Stats.gauge("dataset.orphaned", updated_count)
- def _executor_to_tis(self, tis: list[TaskInstance]) -> dict[BaseExecutor,
list[TaskInstance]]:
+ def _executor_to_tis(self, tis: Iterable[TaskInstance]) ->
dict[BaseExecutor, list[TaskInstance]]:
"""Organize TIs into lists per their respective executor."""
_executor_to_tis: defaultdict[BaseExecutor, list[TaskInstance]] =
defaultdict(list)
for ti in tis:
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index 1ca4c8dc455..6f8d5015ed4 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -1359,6 +1359,7 @@ repos
repr
req
reqs
+requeued
Reserialize
reserialize
reserialized
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index d6534685793..97d3e9fe87d 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -28,6 +28,7 @@ from importlib import reload
from typing import Generator
from unittest import mock
from unittest.mock import MagicMock, PropertyMock, patch
+from uuid import uuid4
import psutil
import pytest
@@ -55,6 +56,7 @@ from airflow.models.dagbag import DagBag
from airflow.models.dagrun import DagRun
from airflow.models.dataset import DatasetDagRunQueue, DatasetEvent,
DatasetModel
from airflow.models.db_callback_request import DbCallbackRequest
+from airflow.models.log import Log
from airflow.models.pool import Pool
from airflow.models.serialized_dag import SerializedDagModel
from airflow.models.taskinstance import SimpleTaskInstance, TaskInstance,
TaskInstanceKey
@@ -123,6 +125,19 @@ def load_examples():
# Patch the MockExecutor into the dict of known executors in the Loader
[email protected]
+def _loader_mock(mock_executors):
+ with
mock.patch("airflow.executors.executor_loader.ExecutorLoader.load_executor") as
loader_mock:
+ # The executors are mocked, so cannot be loaded/imported. Mock
load_executor and return the
+ # correct object for the given input executor name.
+ loader_mock.side_effect = lambda *x: {
+ ("default_exec",): mock_executors[0],
+ (None,): mock_executors[0],
+ ("secondary_exec",): mock_executors[1],
+ }[x]
+ yield
+
+
@patch.dict(
ExecutorLoader.executors, {MOCK_EXECUTOR:
f"{MockExecutor.__module__}.{MockExecutor.__qualname__}"}
)
@@ -2177,7 +2192,18 @@ class TestSchedulerJob:
# Second executor called for ti3
mock_executors[1].try_adopt_task_instances.assert_called_once_with([ti3])
- def test_fail_stuck_queued_tasks(self, dag_maker, session, mock_executors):
+ def test_handle_stuck_queued_tasks_backcompat(self, dag_maker, session,
mock_executors):
+ """
+ Verify backward compatibility of the executor interface w.r.t. stuck
queued.
+
+ Prior to #43520, scheduler called method `cleanup_stuck_queued_tasks`,
which failed tis.
+
+ After #43520, scheduler calls `cleanup_tasks_stuck_in_queued`, which
requeues tis.
+
+ At Airflow 3.0, we should remove backcompat support for this old
function. But for now
+ we verify that we call it as a fallback.
+ """
+ # todo: remove in airflow 3.0
with dag_maker("test_fail_stuck_queued_tasks_multiple_executors"):
op1 = EmptyOperator(task_id="op1")
op2 = EmptyOperator(task_id="op2", executor="default_exec")
@@ -2194,26 +2220,102 @@ class TestSchedulerJob:
scheduler_job = Job()
job_runner = SchedulerJobRunner(job=scheduler_job, num_runs=0)
job_runner._task_queued_timeout = 300
+ mock_exec_1 = mock_executors[0]
+ mock_exec_2 = mock_executors[1]
+ mock_exec_1.revoke_task.side_effect = NotImplementedError
+ mock_exec_2.revoke_task.side_effect = NotImplementedError
with
mock.patch("airflow.executors.executor_loader.ExecutorLoader.load_executor") as
loader_mock:
# The executors are mocked, so cannot be loaded/imported. Mock
load_executor and return the
# correct object for the given input executor name.
loader_mock.side_effect = lambda *x: {
- ("default_exec",): mock_executors[0],
- (None,): mock_executors[0],
- ("secondary_exec",): mock_executors[1],
+ ("default_exec",): mock_exec_1,
+ (None,): mock_exec_1,
+ ("secondary_exec",): mock_exec_2,
}[x]
- job_runner._fail_tasks_stuck_in_queued()
+ job_runner._handle_tasks_stuck_in_queued()
# Default executor is called for ti1 (no explicit executor override
uses default) and ti2 (where we
# explicitly marked that for execution by the default executor)
try:
-
mock_executors[0].cleanup_stuck_queued_tasks.assert_called_once_with(tis=[ti1,
ti2])
+
mock_exec_1.cleanup_stuck_queued_tasks.assert_called_once_with(tis=[ti1, ti2])
except AssertionError:
-
mock_executors[0].cleanup_stuck_queued_tasks.assert_called_once_with(tis=[ti2,
ti1])
-
mock_executors[1].cleanup_stuck_queued_tasks.assert_called_once_with(tis=[ti3])
+
mock_exec_1.cleanup_stuck_queued_tasks.assert_called_once_with(tis=[ti2, ti1])
+
mock_exec_2.cleanup_stuck_queued_tasks.assert_called_once_with(tis=[ti3])
+
+ @conf_vars({("scheduler", "num_stuck_in_queued_retries"): "2"})
+ def test_handle_stuck_queued_tasks_multiple_attempts(self, dag_maker,
session, mock_executors):
+ """Verify that tasks stuck in queued will be rescheduled up to N
times."""
+ with dag_maker("test_fail_stuck_queued_tasks_multiple_executors"):
+ EmptyOperator(task_id="op1")
+ EmptyOperator(task_id="op2", executor="default_exec")
+
+ def _queue_tasks(tis):
+ for ti in tis:
+ ti.state = "queued"
+ ti.queued_dttm = timezone.utcnow()
+ session.commit()
+
+ run_id = str(uuid4())
+ dr = dag_maker.create_dagrun(run_id=run_id)
+
+ tis = dr.get_task_instances(session=session)
+ _queue_tasks(tis=tis)
+ scheduler_job = Job()
+ scheduler = SchedulerJobRunner(job=scheduler_job, num_runs=0)
+ # job_runner._reschedule_stuck_task = MagicMock()
+ scheduler._task_queued_timeout = -300 # always in violation of timeout
+
+ with _loader_mock(mock_executors):
+ scheduler._handle_tasks_stuck_in_queued(session=session)
+
+ # If the task gets stuck in queued once, we reset it to scheduled
+ tis = dr.get_task_instances(session=session)
+ assert [x.state for x in tis] == ["scheduled", "scheduled"]
+ assert [x.queued_dttm for x in tis] == [None, None]
+
+ _queue_tasks(tis=tis)
+ log_events = [x.event for x in
session.scalars(select(Log).where(Log.run_id == run_id)).all()]
+ assert log_events == [
+ "stuck in queued reschedule",
+ "stuck in queued reschedule",
+ ]
+
+ with _loader_mock(mock_executors):
+ scheduler._handle_tasks_stuck_in_queued(session=session)
+ session.commit()
+
+ log_events = [x.event for x in
session.scalars(select(Log).where(Log.run_id == run_id)).all()]
+ assert log_events == [
+ "stuck in queued reschedule",
+ "stuck in queued reschedule",
+ "stuck in queued reschedule",
+ "stuck in queued reschedule",
+ ]
+ mock_executors[0].fail.assert_not_called()
+ tis = dr.get_task_instances(session=session)
+ assert [x.state for x in tis] == ["scheduled", "scheduled"]
+ _queue_tasks(tis=tis)
+
+ with _loader_mock(mock_executors):
+ scheduler._handle_tasks_stuck_in_queued(session=session)
+ session.commit()
+ log_events = [x.event for x in
session.scalars(select(Log).where(Log.run_id == run_id)).all()]
+ assert log_events == [
+ "stuck in queued reschedule",
+ "stuck in queued reschedule",
+ "stuck in queued reschedule",
+ "stuck in queued reschedule",
+ "stuck in queued tries exceeded",
+ "stuck in queued tries exceeded",
+ ]
+
+ mock_executors[0].fail.assert_not_called() # just demoing that we
don't fail with executor method
+ states = [x.state for x in dr.get_task_instances(session=session)]
+ assert states == ["failed", "failed"]
- def test_fail_stuck_queued_tasks_raises_not_implemented(self, dag_maker,
session, caplog):
+ def test_revoke_task_not_imp_tolerated(self, dag_maker, session, caplog):
+ """Test that if executor no implement revoke_task then we don't blow
up."""
with dag_maker("test_fail_stuck_queued_tasks"):
op1 = EmptyOperator(task_id="op1")
@@ -2224,12 +2326,14 @@ class TestSchedulerJob:
session.commit()
from airflow.executors.local_executor import LocalExecutor
+ assert "revoke_task" in BaseExecutor.__dict__
+ # this is just verifying that LocalExecutor is good enough for this
test
+ # in that it does not implement revoke_task
+ assert "revoke_task" not in LocalExecutor.__dict__
scheduler_job = Job(executor=LocalExecutor())
job_runner = SchedulerJobRunner(job=scheduler_job, num_runs=0)
job_runner._task_queued_timeout = 300
- with caplog.at_level(logging.DEBUG):
- job_runner._fail_tasks_stuck_in_queued()
- assert "Executor doesn't support cleanup of stuck queued tasks.
Skipping." in caplog.text
+ job_runner._handle_tasks_stuck_in_queued()
@mock.patch("airflow.dag_processing.manager.DagFileProcessorAgent")
def test_executor_end_called(self, mock_processor_agent, mock_executors):