This is an automated email from the ASF dual-hosted git repository.
rahulvats pushed a commit to branch v2-11-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v2-11-test by this push:
new 2552a3641bf Correctly treat requeuet on reschedule sensors as
resetting after each reschedule (#51410) (#52638)
2552a3641bf is described below
commit 2552a3641bfee8ecf986ba3a1a1b252c3a52fb11
Author: Rahul Vats <[email protected]>
AuthorDate: Tue Jul 1 17:01:51 2025 +0530
Correctly treat requeuet on reschedule sensors as resetting after each
reschedule (#51410) (#52638)
(cherry picked from commit a3621016840d8dba97777954333f6b214d4f3363)
Co-authored-by: Collin McNulty <[email protected]>
---
airflow/jobs/scheduler_job_runner.py | 27 +++++++---
tests/jobs/test_scheduler_job.py | 99 ++++++++++++++++++++++++++++++++++++
2 files changed, 120 insertions(+), 6 deletions(-)
diff --git a/airflow/jobs/scheduler_job_runner.py
b/airflow/jobs/scheduler_job_runner.py
index c9afd40f719..6e6a54a1758 100644
--- a/airflow/jobs/scheduler_job_runner.py
+++ b/airflow/jobs/scheduler_job_runner.py
@@ -33,7 +33,7 @@ from pathlib import Path
from typing import TYPE_CHECKING, Any, Callable, Collection, Iterable, Iterator
from deprecated import deprecated
-from sqlalchemy import and_, delete, func, not_, or_, select, text, update
+from sqlalchemy import and_, delete, desc, func, not_, or_, select, text,
update
from sqlalchemy.exc import OperationalError
from sqlalchemy.orm import lazyload, load_only, make_transient, selectinload
from sqlalchemy.sql import expression
@@ -1899,19 +1899,34 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
We can then use this information to determine whether to reschedule a
task or fail it.
"""
- return (
- session.query(Log)
+ last_running_time = session.scalar(
+ select(Log.dttm)
.where(
- Log.task_id == ti.task_id,
Log.dag_id == ti.dag_id,
+ Log.task_id == ti.task_id,
Log.run_id == ti.run_id,
Log.map_index == ti.map_index,
Log.try_number == ti.try_number,
- Log.event == TASK_STUCK_IN_QUEUED_RESCHEDULE_EVENT,
+ Log.event == "running",
)
- .count()
+ .order_by(desc(Log.dttm))
+ .limit(1)
+ )
+
+ query = session.query(Log).where(
+ Log.task_id == ti.task_id,
+ Log.dag_id == ti.dag_id,
+ Log.run_id == ti.run_id,
+ Log.map_index == ti.map_index,
+ Log.try_number == ti.try_number,
+ Log.event == TASK_STUCK_IN_QUEUED_RESCHEDULE_EVENT,
)
+ if last_running_time:
+ query = query.where(Log.dttm > last_running_time)
+
+ return query.count()
+
@provide_session
def _emit_pool_metrics(self, session: Session = NEW_SESSION) -> None:
from airflow.models.pool import Pool
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 97d3e9fe87d..f8a879145df 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -2314,6 +2314,105 @@ class TestSchedulerJob:
states = [x.state for x in dr.get_task_instances(session=session)]
assert states == ["failed", "failed"]
+ @conf_vars({("scheduler", "num_stuck_in_queued_retries"): "2"})
+ def test_handle_stuck_queued_tasks_reschedule_sensors(self, dag_maker,
session, mock_executors):
+ """Reschedule sensors go in and out of running repeatedly using the
same try_number
+ Make sure that they get three attempts per reschedule, not 3 attempts
per try_number"""
+ with dag_maker("test_fail_stuck_queued_tasks_multiple_executors"):
+ EmptyOperator(task_id="op1")
+ EmptyOperator(task_id="op2", executor="default_exec")
+
+ def _queue_tasks(tis):
+ for ti in tis:
+ ti.state = "queued"
+ ti.queued_dttm = timezone.utcnow()
+ session.commit()
+
+ def _add_running_event(tis):
+ for ti in tis:
+ updated_entry = Log(
+ dttm=timezone.utcnow(),
+ dag_id=ti.dag_id,
+ task_id=ti.task_id,
+ map_index=ti.map_index,
+ event="running",
+ run_id=ti.run_id,
+ try_number=ti.try_number,
+ )
+ session.add(updated_entry)
+
+ run_id = str(uuid4())
+ dr = dag_maker.create_dagrun(run_id=run_id)
+
+ tis = dr.get_task_instances(session=session)
+ _queue_tasks(tis=tis)
+ scheduler_job = Job()
+ scheduler = SchedulerJobRunner(job=scheduler_job, num_runs=0)
+ # job_runner._reschedule_stuck_task = MagicMock()
+ scheduler._task_queued_timeout = -300 # always in violation of timeout
+
+ with _loader_mock(mock_executors):
+ scheduler._handle_tasks_stuck_in_queued()
+ # If the task gets stuck in queued once, we reset it to scheduled
+ tis = dr.get_task_instances(session=session)
+ assert [x.state for x in tis] == ["scheduled", "scheduled"]
+ assert [x.queued_dttm for x in tis] == [None, None]
+
+ _queue_tasks(tis=tis)
+ log_events = [
+ x.event for x in session.scalars(select(Log).where(Log.run_id ==
run_id).order_by(Log.id)).all()
+ ]
+ assert log_events == [
+ "stuck in queued reschedule",
+ "stuck in queued reschedule",
+ ]
+
+ with _loader_mock(mock_executors):
+ scheduler._handle_tasks_stuck_in_queued()
+
+ log_events = [
+ x.event for x in session.scalars(select(Log).where(Log.run_id ==
run_id).order_by(Log.id)).all()
+ ]
+ assert log_events == [
+ "stuck in queued reschedule",
+ "stuck in queued reschedule",
+ "stuck in queued reschedule",
+ "stuck in queued reschedule",
+ ]
+ mock_executors[0].fail.assert_not_called()
+ tis = dr.get_task_instances(session=session)
+ assert [x.state for x in tis] == ["scheduled", "scheduled"]
+
+ _add_running_event(tis) # This should "reset" the count of stuck
queued
+
+ for _ in range(3): # Should be able to be stuck 3 more times before
failing
+ _queue_tasks(tis=tis)
+ with _loader_mock(mock_executors):
+ scheduler._handle_tasks_stuck_in_queued()
+ tis = dr.get_task_instances(session=session)
+
+ log_events = [
+ x.event for x in session.scalars(select(Log).where(Log.run_id ==
run_id).order_by(Log.id)).all()
+ ]
+ assert log_events == [
+ "stuck in queued reschedule",
+ "stuck in queued reschedule",
+ "stuck in queued reschedule",
+ "stuck in queued reschedule",
+ "running",
+ "running",
+ "stuck in queued reschedule",
+ "stuck in queued reschedule",
+ "stuck in queued reschedule",
+ "stuck in queued reschedule",
+ "stuck in queued tries exceeded",
+ "stuck in queued tries exceeded",
+ ]
+
+ mock_executors[0].fail.assert_not_called() # just demoing that we
don't fail with executor method
+ states = [x.state for x in dr.get_task_instances(session=session)]
+ assert states == ["failed", "failed"]
+
def test_revoke_task_not_imp_tolerated(self, dag_maker, session, caplog):
"""Test that if executor no implement revoke_task then we don't blow
up."""
with dag_maker("test_fail_stuck_queued_tasks"):