This is an automated email from the ASF dual-hosted git repository.

jscheffl pushed a commit to branch v2-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v2-10-test by this push:
     new 341d36d2f16 [v2-10-test] Re-queue tassk when they are stuck in queued 
(#43520) (#44158)
341d36d2f16 is described below

commit 341d36d2f161ae450ceea70d8a58e16777d9099d
Author: Jens Scheffler <[email protected]>
AuthorDate: Tue Nov 19 21:06:58 2024 +0100

    [v2-10-test] Re-queue tassk when they are stuck in queued (#43520) (#44158)
    
    * [v2-10-test] Re-queue tassk when they are stuck in queued (#43520)
    
    The old "stuck in queued" logic just failed the tasks.  Now we requeue 
them.  We accomplish this by revoking the task from executor and setting state 
to scheduled.  We'll re-queue it up to 2 times.  Number of times is 
configurable by hidden config.
    
    We added a method to base executor revoke_task because, it's a discrete 
operation that is required for this feature, and it might be useful in other 
cases e.g. when detecting as zombies etc.  We set state to failed or scheduled 
directly from scheduler (rather than sending through the event buffer) because 
event buffer makes more sense for handling external events -- why round trip 
through the executor and back to scheduler when scheduler is initiating the 
action?  Anyway this avoids h [...]
    
    ---------
    
    (cherry picked from commit a41feeb5aedad842be2b0f954e0be30c767dbc5e)
    
    Co-authored-by: Daniel Imberman <[email protected]>
    Co-authored-by: Daniel Standish 
<[email protected]>
    Co-authored-by: Jed Cunningham 
<[email protected]>
    
    * fix test_handle_stuck_queued_tasks_multiple_attempts (#44093)
    
    ---------
    
    Co-authored-by: Daniel Imberman <[email protected]>
    Co-authored-by: Daniel Standish 
<[email protected]>
    Co-authored-by: Jed Cunningham 
<[email protected]>
    Co-authored-by: GPK <[email protected]>
---
 airflow/executors/base_executor.py   |  26 +++++-
 airflow/jobs/scheduler_job_runner.py | 161 ++++++++++++++++++++++++++++-------
 docs/spelling_wordlist.txt           |   1 +
 tests/jobs/test_scheduler_job.py     | 128 +++++++++++++++++++++++++---
 4 files changed, 270 insertions(+), 46 deletions(-)

diff --git a/airflow/executors/base_executor.py 
b/airflow/executors/base_executor.py
index 57568af1997..5a5cf2d73f1 100644
--- a/airflow/executors/base_executor.py
+++ b/airflow/executors/base_executor.py
@@ -26,6 +26,7 @@ from dataclasses import dataclass, field
 from typing import TYPE_CHECKING, Any, List, Optional, Sequence, Tuple
 
 import pendulum
+from deprecated import deprecated
 
 from airflow.cli.cli_config import DefaultHelpParser
 from airflow.configuration import conf
@@ -545,7 +546,12 @@ class BaseExecutor(LoggingMixin):
         """Get called when the daemon receives a SIGTERM."""
         raise NotImplementedError()
 
-    def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> 
list[str]:  # pragma: no cover
+    @deprecated(
+        reason="Replaced by function `revoke_task`.",
+        category=RemovedInAirflow3Warning,
+        action="ignore",
+    )
+    def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]:
         """
         Handle remnants of tasks that were failed because they were stuck in 
queued.
 
@@ -556,7 +562,23 @@ class BaseExecutor(LoggingMixin):
         :param tis: List of Task Instances to clean up
         :return: List of readable task instances for a warning message
         """
-        raise NotImplementedError()
+        raise NotImplementedError
+
+    def revoke_task(self, *, ti: TaskInstance):
+        """
+        Attempt to remove task from executor.
+
+        It should attempt to ensure that the task is no longer running on the 
worker,
+        and ensure that it is cleared out from internal data structures.
+
+        It should *not* change the state of the task in airflow, or add any 
events
+        to the event buffer.
+
+        It should not raise any error.
+
+        :param ti: Task instance to remove
+        """
+        raise NotImplementedError
 
     def try_adopt_task_instances(self, tis: Sequence[TaskInstance]) -> 
Sequence[TaskInstance]:
         """
diff --git a/airflow/jobs/scheduler_job_runner.py 
b/airflow/jobs/scheduler_job_runner.py
index aa4e8d4f26a..c9afd40f719 100644
--- a/airflow/jobs/scheduler_job_runner.py
+++ b/airflow/jobs/scheduler_job_runner.py
@@ -25,12 +25,14 @@ import sys
 import time
 import warnings
 from collections import Counter, defaultdict, deque
+from contextlib import suppress
 from dataclasses import dataclass
 from datetime import timedelta
 from functools import lru_cache, partial
 from pathlib import Path
 from typing import TYPE_CHECKING, Any, Callable, Collection, Iterable, Iterator
 
+from deprecated import deprecated
 from sqlalchemy import and_, delete, func, not_, or_, select, text, update
 from sqlalchemy.exc import OperationalError
 from sqlalchemy.orm import lazyload, load_only, make_transient, selectinload
@@ -97,6 +99,9 @@ TI = TaskInstance
 DR = DagRun
 DM = DagModel
 
+TASK_STUCK_IN_QUEUED_RESCHEDULE_EVENT = "stuck in queued reschedule"
+""":meta private:"""
+
 
 @dataclass
 class ConcurrencyMap:
@@ -228,6 +233,13 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
             stalled_task_timeout, task_adoption_timeout, 
worker_pods_pending_timeout, task_queued_timeout
         )
 
+        # this param is intentionally undocumented
+        self._num_stuck_queued_retries = conf.getint(
+            section="scheduler",
+            key="num_stuck_in_queued_retries",
+            fallback=2,
+        )
+
         self.do_pickle = do_pickle
 
         if log:
@@ -1093,7 +1105,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
 
         timers.call_regular_interval(
             conf.getfloat("scheduler", "task_queued_timeout_check_interval"),
-            self._fail_tasks_stuck_in_queued,
+            self._handle_tasks_stuck_in_queued,
         )
 
         timers.call_regular_interval(
@@ -1141,6 +1153,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
                 for executor in self.job.executors:
                     try:
                         # this is backcompat check if executor does not 
inherit from BaseExecutor
+                        # todo: remove in airflow 3.0
                         if not hasattr(executor, "_task_event_logs"):
                             continue
                         with create_session() as session:
@@ -1772,48 +1785,132 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
         self.job.executor.send_callback(request)
 
     @provide_session
-    def _fail_tasks_stuck_in_queued(self, session: Session = NEW_SESSION) -> 
None:
+    def _handle_tasks_stuck_in_queued(self, session: Session = NEW_SESSION) -> 
None:
         """
-        Mark tasks stuck in queued for longer than `task_queued_timeout` as 
failed.
+        Handle the scenario where a task is queued for longer than 
`task_queued_timeout`.
 
         Tasks can get stuck in queued for a wide variety of reasons (e.g. 
celery loses
         track of a task, a cluster can't further scale up its workers, etc.), 
but tasks
-        should not be stuck in queued for a long time. This will mark tasks 
stuck in
-        queued for longer than `self._task_queued_timeout` as failed. If the 
task has
-        available retries, it will be retried.
+        should not be stuck in queued for a long time.
+
+        We will attempt to requeue the task (by revoking it from executor and 
setting to
+        scheduled) up to 2 times before failing the task.
         """
-        self.log.debug("Calling SchedulerJob._fail_tasks_stuck_in_queued 
method")
+        tasks_stuck_in_queued = self._get_tis_stuck_in_queued(session)
+        for executor, stuck_tis in 
self._executor_to_tis(tasks_stuck_in_queued).items():
+            try:
+                for ti in stuck_tis:
+                    executor.revoke_task(ti=ti)
+                    self._maybe_requeue_stuck_ti(
+                        ti=ti,
+                        session=session,
+                    )
+            except NotImplementedError:
+                # this block only gets entered if the executor has not 
implemented `revoke_task`.
+                # in which case, we try the fallback logic
+                # todo: remove the call to _stuck_in_queued_backcompat_logic 
in airflow 3.0.
+                #   after 3.0, `cleanup_stuck_queued_tasks` will be removed, 
so we should
+                #   just continue immediately.
+                self._stuck_in_queued_backcompat_logic(executor, stuck_tis)
+                continue
 
-        tasks_stuck_in_queued = session.scalars(
+    def _get_tis_stuck_in_queued(self, session) -> Iterable[TaskInstance]:
+        """Query db for TIs that are stuck in queued."""
+        return session.scalars(
             select(TI).where(
                 TI.state == TaskInstanceState.QUEUED,
                 TI.queued_dttm < (timezone.utcnow() - 
timedelta(seconds=self._task_queued_timeout)),
                 TI.queued_by_job_id == self.job.id,
             )
-        ).all()
+        )
 
-        for executor, stuck_tis in 
self._executor_to_tis(tasks_stuck_in_queued).items():
-            try:
-                cleaned_up_task_instances = 
set(executor.cleanup_stuck_queued_tasks(tis=stuck_tis))
-                for ti in stuck_tis:
-                    if repr(ti) in cleaned_up_task_instances:
-                        self.log.warning(
-                            "Marking task instance %s stuck in queued as 
failed. "
-                            "If the task instance has available retries, it 
will be retried.",
-                            ti,
-                        )
-                        session.add(
-                            Log(
-                                event="stuck in queued",
-                                task_instance=ti.key,
-                                extra=(
-                                    "Task will be marked as failed. If the 
task instance has "
-                                    "available retries, it will be retried."
-                                ),
-                            )
-                        )
-            except NotImplementedError:
-                self.log.debug("Executor doesn't support cleanup of stuck 
queued tasks. Skipping.")
+    def _maybe_requeue_stuck_ti(self, *, ti, session):
+        """
+        Requeue task if it has not been attempted too many times.
+
+        Otherwise, fail it.
+        """
+        num_times_stuck = self._get_num_times_stuck_in_queued(ti, session)
+        if num_times_stuck < self._num_stuck_queued_retries:
+            self.log.info("Task stuck in queued; will try to requeue. 
task_id=%s", ti.task_id)
+            session.add(
+                Log(
+                    event=TASK_STUCK_IN_QUEUED_RESCHEDULE_EVENT,
+                    task_instance=ti.key,
+                    extra=(
+                        f"Task was in queued state for longer than 
{self._task_queued_timeout} "
+                        "seconds; task state will be set back to scheduled."
+                    ),
+                )
+            )
+            self._reschedule_stuck_task(ti)
+        else:
+            self.log.info(
+                "Task requeue attempts exceeded max; marking failed. 
task_instance=%s",
+                ti,
+            )
+            session.add(
+                Log(
+                    event="stuck in queued tries exceeded",
+                    task_instance=ti.key,
+                    extra=f"Task was requeued more than 
{self._num_stuck_queued_retries} times and will be failed.",
+                )
+            )
+            ti.set_state(TaskInstanceState.FAILED, session=session)
+
+    @deprecated(
+        reason="This is backcompat layer for older executor interface. Should 
be removed in 3.0",
+        category=RemovedInAirflow3Warning,
+        action="ignore",
+    )
+    def _stuck_in_queued_backcompat_logic(self, executor, stuck_tis):
+        """
+        Try to invoke stuck in queued cleanup for older executor interface.
+
+        TODO: remove in airflow 3.0
+
+        Here we handle case where the executor pre-dates the interface change 
that
+        introduced `cleanup_tasks_stuck_in_queued` and deprecated 
`cleanup_stuck_queued_tasks`.
+
+        """
+        with suppress(NotImplementedError):
+            for ti_repr in executor.cleanup_stuck_queued_tasks(tis=stuck_tis):
+                self.log.warning(
+                    "Task instance %s stuck in queued. Will be set to failed.",
+                    ti_repr,
+                )
+
+    @provide_session
+    def _reschedule_stuck_task(self, ti, session=NEW_SESSION):
+        session.execute(
+            update(TI)
+            .where(TI.filter_for_tis([ti]))
+            .values(
+                state=TaskInstanceState.SCHEDULED,
+                queued_dttm=None,
+            )
+            .execution_options(synchronize_session=False)
+        )
+
+    @provide_session
+    def _get_num_times_stuck_in_queued(self, ti: TaskInstance, session: 
Session = NEW_SESSION) -> int:
+        """
+        Check the Log table to see how many times a taskinstance has been 
stuck in queued.
+
+        We can then use this information to determine whether to reschedule a 
task or fail it.
+        """
+        return (
+            session.query(Log)
+            .where(
+                Log.task_id == ti.task_id,
+                Log.dag_id == ti.dag_id,
+                Log.run_id == ti.run_id,
+                Log.map_index == ti.map_index,
+                Log.try_number == ti.try_number,
+                Log.event == TASK_STUCK_IN_QUEUED_RESCHEDULE_EVENT,
+            )
+            .count()
+        )
 
     @provide_session
     def _emit_pool_metrics(self, session: Session = NEW_SESSION) -> None:
@@ -2102,7 +2199,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
         updated_count = sum(self._set_orphaned(dataset) for dataset in 
orphaned_dataset_query)
         Stats.gauge("dataset.orphaned", updated_count)
 
-    def _executor_to_tis(self, tis: list[TaskInstance]) -> dict[BaseExecutor, 
list[TaskInstance]]:
+    def _executor_to_tis(self, tis: Iterable[TaskInstance]) -> 
dict[BaseExecutor, list[TaskInstance]]:
         """Organize TIs into lists per their respective executor."""
         _executor_to_tis: defaultdict[BaseExecutor, list[TaskInstance]] = 
defaultdict(list)
         for ti in tis:
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index 1ca4c8dc455..6f8d5015ed4 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -1359,6 +1359,7 @@ repos
 repr
 req
 reqs
+requeued
 Reserialize
 reserialize
 reserialized
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index d6534685793..97d3e9fe87d 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -28,6 +28,7 @@ from importlib import reload
 from typing import Generator
 from unittest import mock
 from unittest.mock import MagicMock, PropertyMock, patch
+from uuid import uuid4
 
 import psutil
 import pytest
@@ -55,6 +56,7 @@ from airflow.models.dagbag import DagBag
 from airflow.models.dagrun import DagRun
 from airflow.models.dataset import DatasetDagRunQueue, DatasetEvent, 
DatasetModel
 from airflow.models.db_callback_request import DbCallbackRequest
+from airflow.models.log import Log
 from airflow.models.pool import Pool
 from airflow.models.serialized_dag import SerializedDagModel
 from airflow.models.taskinstance import SimpleTaskInstance, TaskInstance, 
TaskInstanceKey
@@ -123,6 +125,19 @@ def load_examples():
 
 
 # Patch the MockExecutor into the dict of known executors in the Loader
[email protected]
+def _loader_mock(mock_executors):
+    with 
mock.patch("airflow.executors.executor_loader.ExecutorLoader.load_executor") as 
loader_mock:
+        # The executors are mocked, so cannot be loaded/imported. Mock 
load_executor and return the
+        # correct object for the given input executor name.
+        loader_mock.side_effect = lambda *x: {
+            ("default_exec",): mock_executors[0],
+            (None,): mock_executors[0],
+            ("secondary_exec",): mock_executors[1],
+        }[x]
+        yield
+
+
 @patch.dict(
     ExecutorLoader.executors, {MOCK_EXECUTOR: 
f"{MockExecutor.__module__}.{MockExecutor.__qualname__}"}
 )
@@ -2177,7 +2192,18 @@ class TestSchedulerJob:
         # Second executor called for ti3
         
mock_executors[1].try_adopt_task_instances.assert_called_once_with([ti3])
 
-    def test_fail_stuck_queued_tasks(self, dag_maker, session, mock_executors):
+    def test_handle_stuck_queued_tasks_backcompat(self, dag_maker, session, 
mock_executors):
+        """
+        Verify backward compatibility of the executor interface w.r.t. stuck 
queued.
+
+        Prior to #43520, scheduler called method `cleanup_stuck_queued_tasks`, 
which failed tis.
+
+        After #43520, scheduler calls `cleanup_tasks_stuck_in_queued`, which 
requeues tis.
+
+        At Airflow 3.0, we should remove backcompat support for this old 
function. But for now
+        we verify that we call it as a fallback.
+        """
+        # todo: remove in airflow 3.0
         with dag_maker("test_fail_stuck_queued_tasks_multiple_executors"):
             op1 = EmptyOperator(task_id="op1")
             op2 = EmptyOperator(task_id="op2", executor="default_exec")
@@ -2194,26 +2220,102 @@ class TestSchedulerJob:
         scheduler_job = Job()
         job_runner = SchedulerJobRunner(job=scheduler_job, num_runs=0)
         job_runner._task_queued_timeout = 300
+        mock_exec_1 = mock_executors[0]
+        mock_exec_2 = mock_executors[1]
+        mock_exec_1.revoke_task.side_effect = NotImplementedError
+        mock_exec_2.revoke_task.side_effect = NotImplementedError
 
         with 
mock.patch("airflow.executors.executor_loader.ExecutorLoader.load_executor") as 
loader_mock:
             # The executors are mocked, so cannot be loaded/imported. Mock 
load_executor and return the
             # correct object for the given input executor name.
             loader_mock.side_effect = lambda *x: {
-                ("default_exec",): mock_executors[0],
-                (None,): mock_executors[0],
-                ("secondary_exec",): mock_executors[1],
+                ("default_exec",): mock_exec_1,
+                (None,): mock_exec_1,
+                ("secondary_exec",): mock_exec_2,
             }[x]
-            job_runner._fail_tasks_stuck_in_queued()
+            job_runner._handle_tasks_stuck_in_queued()
 
         # Default executor is called for ti1 (no explicit executor override 
uses default) and ti2 (where we
         # explicitly marked that for execution by the default executor)
         try:
-            
mock_executors[0].cleanup_stuck_queued_tasks.assert_called_once_with(tis=[ti1, 
ti2])
+            
mock_exec_1.cleanup_stuck_queued_tasks.assert_called_once_with(tis=[ti1, ti2])
         except AssertionError:
-            
mock_executors[0].cleanup_stuck_queued_tasks.assert_called_once_with(tis=[ti2, 
ti1])
-        
mock_executors[1].cleanup_stuck_queued_tasks.assert_called_once_with(tis=[ti3])
+            
mock_exec_1.cleanup_stuck_queued_tasks.assert_called_once_with(tis=[ti2, ti1])
+        
mock_exec_2.cleanup_stuck_queued_tasks.assert_called_once_with(tis=[ti3])
+
+    @conf_vars({("scheduler", "num_stuck_in_queued_retries"): "2"})
+    def test_handle_stuck_queued_tasks_multiple_attempts(self, dag_maker, 
session, mock_executors):
+        """Verify that tasks stuck in queued will be rescheduled up to N 
times."""
+        with dag_maker("test_fail_stuck_queued_tasks_multiple_executors"):
+            EmptyOperator(task_id="op1")
+            EmptyOperator(task_id="op2", executor="default_exec")
+
+        def _queue_tasks(tis):
+            for ti in tis:
+                ti.state = "queued"
+                ti.queued_dttm = timezone.utcnow()
+            session.commit()
+
+        run_id = str(uuid4())
+        dr = dag_maker.create_dagrun(run_id=run_id)
+
+        tis = dr.get_task_instances(session=session)
+        _queue_tasks(tis=tis)
+        scheduler_job = Job()
+        scheduler = SchedulerJobRunner(job=scheduler_job, num_runs=0)
+        # job_runner._reschedule_stuck_task = MagicMock()
+        scheduler._task_queued_timeout = -300  # always in violation of timeout
+
+        with _loader_mock(mock_executors):
+            scheduler._handle_tasks_stuck_in_queued(session=session)
+
+        # If the task gets stuck in queued once, we reset it to scheduled
+        tis = dr.get_task_instances(session=session)
+        assert [x.state for x in tis] == ["scheduled", "scheduled"]
+        assert [x.queued_dttm for x in tis] == [None, None]
+
+        _queue_tasks(tis=tis)
+        log_events = [x.event for x in 
session.scalars(select(Log).where(Log.run_id == run_id)).all()]
+        assert log_events == [
+            "stuck in queued reschedule",
+            "stuck in queued reschedule",
+        ]
+
+        with _loader_mock(mock_executors):
+            scheduler._handle_tasks_stuck_in_queued(session=session)
+        session.commit()
+
+        log_events = [x.event for x in 
session.scalars(select(Log).where(Log.run_id == run_id)).all()]
+        assert log_events == [
+            "stuck in queued reschedule",
+            "stuck in queued reschedule",
+            "stuck in queued reschedule",
+            "stuck in queued reschedule",
+        ]
+        mock_executors[0].fail.assert_not_called()
+        tis = dr.get_task_instances(session=session)
+        assert [x.state for x in tis] == ["scheduled", "scheduled"]
+        _queue_tasks(tis=tis)
+
+        with _loader_mock(mock_executors):
+            scheduler._handle_tasks_stuck_in_queued(session=session)
+        session.commit()
+        log_events = [x.event for x in 
session.scalars(select(Log).where(Log.run_id == run_id)).all()]
+        assert log_events == [
+            "stuck in queued reschedule",
+            "stuck in queued reschedule",
+            "stuck in queued reschedule",
+            "stuck in queued reschedule",
+            "stuck in queued tries exceeded",
+            "stuck in queued tries exceeded",
+        ]
+
+        mock_executors[0].fail.assert_not_called()  # just demoing that we 
don't fail with executor method
+        states = [x.state for x in dr.get_task_instances(session=session)]
+        assert states == ["failed", "failed"]
 
-    def test_fail_stuck_queued_tasks_raises_not_implemented(self, dag_maker, 
session, caplog):
+    def test_revoke_task_not_imp_tolerated(self, dag_maker, session, caplog):
+        """Test that if executor no implement revoke_task then we don't blow 
up."""
         with dag_maker("test_fail_stuck_queued_tasks"):
             op1 = EmptyOperator(task_id="op1")
 
@@ -2224,12 +2326,14 @@ class TestSchedulerJob:
         session.commit()
         from airflow.executors.local_executor import LocalExecutor
 
+        assert "revoke_task" in BaseExecutor.__dict__
+        # this is just verifying that LocalExecutor is good enough for this 
test
+        # in that it does not implement revoke_task
+        assert "revoke_task" not in LocalExecutor.__dict__
         scheduler_job = Job(executor=LocalExecutor())
         job_runner = SchedulerJobRunner(job=scheduler_job, num_runs=0)
         job_runner._task_queued_timeout = 300
-        with caplog.at_level(logging.DEBUG):
-            job_runner._fail_tasks_stuck_in_queued()
-        assert "Executor doesn't support cleanup of stuck queued tasks. 
Skipping." in caplog.text
+        job_runner._handle_tasks_stuck_in_queued()
 
     @mock.patch("airflow.dag_processing.manager.DagFileProcessorAgent")
     def test_executor_end_called(self, mock_processor_agent, mock_executors):

Reply via email to