This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new a825c95afab Don't create new session in stuck queue reschedule handler 
(#44192)
a825c95afab is described below

commit a825c95afab95b23f20aee2306eca81942a8a405
Author: Daniel Standish <[email protected]>
AuthorDate: Tue Nov 19 13:49:51 2024 -0800

    Don't create new session in stuck queue reschedule handler (#44192)
    
    This is a fix up / followup to #43520
    
    It does not really make a material difference, just, I'm avoiding use of 
the session decorator, and the create / dispose session logic, when it is not 
needed. i also commit as i go along since there's no reason to handle multiple 
distinct tis in the same transaction.
---
 airflow/jobs/scheduler_job_runner.py | 8 ++++----
 tests/jobs/test_scheduler_job.py     | 9 +++------
 2 files changed, 7 insertions(+), 10 deletions(-)

diff --git a/airflow/jobs/scheduler_job_runner.py 
b/airflow/jobs/scheduler_job_runner.py
index dda49e4f843..96a09eb99bb 100644
--- a/airflow/jobs/scheduler_job_runner.py
+++ b/airflow/jobs/scheduler_job_runner.py
@@ -1800,6 +1800,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
                         ti=ti,
                         session=session,
                     )
+                    session.commit()
             except NotImplementedError:
                 # this block only gets entered if the executor has not 
implemented `revoke_task`.
                 # in which case, we try the fallback logic
@@ -1838,7 +1839,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
                     ),
                 )
             )
-            self._reschedule_stuck_task(ti)
+            self._reschedule_stuck_task(ti, session=session)
         else:
             self.log.info(
                 "Task requeue attempts exceeded max; marking failed. 
task_instance=%s",
@@ -1875,8 +1876,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
                     ti_repr,
                 )
 
-    @provide_session
-    def _reschedule_stuck_task(self, ti, session=NEW_SESSION):
+    def _reschedule_stuck_task(self, ti: TaskInstance, session: Session):
         session.execute(
             update(TI)
             .where(TI.filter_for_tis([ti]))
@@ -1890,7 +1890,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
     @provide_session
     def _get_num_times_stuck_in_queued(self, ti: TaskInstance, session: 
Session = NEW_SESSION) -> int:
         """
-        Check the Log table to see how many times a taskinstance has been 
stuck in queued.
+        Check the Log table to see how many times a task instance has been 
stuck in queued.
 
         We can then use this information to determine whether to reschedule a 
task or fail it.
         """
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 12423157ccb..fbf1d4228b5 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -2276,8 +2276,7 @@ class TestSchedulerJob:
         scheduler._task_queued_timeout = -300  # always in violation of timeout
 
         with _loader_mock(mock_executors):
-            scheduler._handle_tasks_stuck_in_queued(session=session)
-
+            scheduler._handle_tasks_stuck_in_queued()
         # If the task gets stuck in queued once, we reset it to scheduled
         tis = dr.get_task_instances(session=session)
         assert [x.state for x in tis] == ["scheduled", "scheduled"]
@@ -2291,8 +2290,7 @@ class TestSchedulerJob:
         ]
 
         with _loader_mock(mock_executors):
-            scheduler._handle_tasks_stuck_in_queued(session=session)
-        session.commit()
+            scheduler._handle_tasks_stuck_in_queued()
 
         log_events = [x.event for x in 
session.scalars(select(Log).where(Log.run_id == run_id)).all()]
         assert log_events == [
@@ -2307,8 +2305,7 @@ class TestSchedulerJob:
         _queue_tasks(tis=tis)
 
         with _loader_mock(mock_executors):
-            scheduler._handle_tasks_stuck_in_queued(session=session)
-        session.commit()
+            scheduler._handle_tasks_stuck_in_queued()
         log_events = [x.event for x in 
session.scalars(select(Log).where(Log.run_id == run_id)).all()]
         assert log_events == [
             "stuck in queued reschedule",

Reply via email to