This is an automated email from the ASF dual-hosted git repository.
dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new a825c95afab Don't create new session in stuck queue reschedule handler
(#44192)
a825c95afab is described below
commit a825c95afab95b23f20aee2306eca81942a8a405
Author: Daniel Standish <[email protected]>
AuthorDate: Tue Nov 19 13:49:51 2024 -0800
Don't create new session in stuck queue reschedule handler (#44192)
This is a fix up / followup to #43520
It does not really make a material difference, just, I'm avoiding use of
the session decorator, and the create / dispose session logic, when it is not
needed. i also commit as i go along since there's no reason to handle multiple
distinct tis in the same transaction.
---
airflow/jobs/scheduler_job_runner.py | 8 ++++----
tests/jobs/test_scheduler_job.py | 9 +++------
2 files changed, 7 insertions(+), 10 deletions(-)
diff --git a/airflow/jobs/scheduler_job_runner.py
b/airflow/jobs/scheduler_job_runner.py
index dda49e4f843..96a09eb99bb 100644
--- a/airflow/jobs/scheduler_job_runner.py
+++ b/airflow/jobs/scheduler_job_runner.py
@@ -1800,6 +1800,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
ti=ti,
session=session,
)
+ session.commit()
except NotImplementedError:
# this block only gets entered if the executor has not
implemented `revoke_task`.
# in which case, we try the fallback logic
@@ -1838,7 +1839,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
),
)
)
- self._reschedule_stuck_task(ti)
+ self._reschedule_stuck_task(ti, session=session)
else:
self.log.info(
"Task requeue attempts exceeded max; marking failed.
task_instance=%s",
@@ -1875,8 +1876,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
ti_repr,
)
- @provide_session
- def _reschedule_stuck_task(self, ti, session=NEW_SESSION):
+ def _reschedule_stuck_task(self, ti: TaskInstance, session: Session):
session.execute(
update(TI)
.where(TI.filter_for_tis([ti]))
@@ -1890,7 +1890,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
@provide_session
def _get_num_times_stuck_in_queued(self, ti: TaskInstance, session:
Session = NEW_SESSION) -> int:
"""
- Check the Log table to see how many times a taskinstance has been
stuck in queued.
+ Check the Log table to see how many times a task instance has been
stuck in queued.
We can then use this information to determine whether to reschedule a
task or fail it.
"""
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 12423157ccb..fbf1d4228b5 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -2276,8 +2276,7 @@ class TestSchedulerJob:
scheduler._task_queued_timeout = -300 # always in violation of timeout
with _loader_mock(mock_executors):
- scheduler._handle_tasks_stuck_in_queued(session=session)
-
+ scheduler._handle_tasks_stuck_in_queued()
# If the task gets stuck in queued once, we reset it to scheduled
tis = dr.get_task_instances(session=session)
assert [x.state for x in tis] == ["scheduled", "scheduled"]
@@ -2291,8 +2290,7 @@ class TestSchedulerJob:
]
with _loader_mock(mock_executors):
- scheduler._handle_tasks_stuck_in_queued(session=session)
- session.commit()
+ scheduler._handle_tasks_stuck_in_queued()
log_events = [x.event for x in
session.scalars(select(Log).where(Log.run_id == run_id)).all()]
assert log_events == [
@@ -2307,8 +2305,7 @@ class TestSchedulerJob:
_queue_tasks(tis=tis)
with _loader_mock(mock_executors):
- scheduler._handle_tasks_stuck_in_queued(session=session)
- session.commit()
+ scheduler._handle_tasks_stuck_in_queued()
log_events = [x.event for x in
session.scalars(select(Log).where(Log.run_id == run_id)).all()]
assert log_events == [
"stuck in queued reschedule",