This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 5d743e71fd clearer method name in scheduler_job.py (#23702)
5d743e71fd is described below

commit 5d743e71fd0510954afc34e1344510a9c599f1bd
Author: Ryan Hatter <[email protected]>
AuthorDate: Wed May 18 18:06:16 2022 -0500

    clearer method name in scheduler_job.py (#23702)
---
 airflow/jobs/scheduler_job.py    |  8 ++++----
 tests/jobs/test_scheduler_job.py | 16 ++++++++--------
 2 files changed, 12 insertions(+), 12 deletions(-)

diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index 0a93d07ffb..22ba5decb4 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -553,9 +553,9 @@ class SchedulerJob(BaseJob):
                 queue=queue,
             )
 
-    def _critical_section_execute_task_instances(self, session: Session) -> 
int:
+    def _critical_section_enqueue_task_instances(self, session: Session) -> 
int:
         """
-        Attempts to execute TaskInstances that should be executed by the 
scheduler.
+        Enqueues TaskInstances for execution.
 
         There are three steps:
         1. Pick TIs by priority with the constraint that they are in the 
expected states
@@ -910,7 +910,7 @@ class SchedulerJob(BaseJob):
         - Then, via a Critical Section (locking the rows of the Pool model) we 
queue tasks, and then send them
           to the executor.
 
-          See docs of _critical_section_execute_task_instances for more.
+          See docs of _critical_section_enqueue_task_instances for more.
 
         :return: Number of TIs enqueued in this iteration
         :rtype: int
@@ -958,7 +958,7 @@ class SchedulerJob(BaseJob):
                     timer.start()
 
                     # Find anything TIs in state SCHEDULED, try to QUEUE it 
(send it to the executor)
-                    num_queued_tis = 
self._critical_section_execute_task_instances(session=session)
+                    num_queued_tis = 
self._critical_section_enqueue_task_instances(session=session)
 
                     # Make sure we only sent this metric if we obtained the 
lock, otherwise we'll skew the
                     # metric, way down
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index a4fe6a0b3e..b50c8bfd7f 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -395,7 +395,7 @@ class TestSchedulerJob:
         (ti1,) = dr1.task_instances
         ti1.state = State.SCHEDULED
 
-        self.scheduler_job._critical_section_execute_task_instances(session)
+        self.scheduler_job._critical_section_enqueue_task_instances(session)
         ti1.refresh_from_db(session=session)
         assert State.SCHEDULED == ti1.state
         session.rollback()
@@ -423,7 +423,7 @@ class TestSchedulerJob:
 
         assert dr1.is_backfill
 
-        self.scheduler_job._critical_section_execute_task_instances(session)
+        self.scheduler_job._critical_section_enqueue_task_instances(session)
         session.flush()
         ti1.refresh_from_db()
         assert State.SCHEDULED == ti1.state
@@ -1258,7 +1258,7 @@ class TestSchedulerJob:
         assert ti.state == State.NONE
         mock_queue_command.assert_not_called()
 
-    def test_critical_section_execute_task_instances(self, dag_maker):
+    def test_critical_section_enqueue_task_instances(self, dag_maker):
         dag_id = 'SchedulerJobTest.test_execute_task_instances'
         task_id_1 = 'dummy_task'
         task_id_2 = 'dummy_task_nonexistent_queue'
@@ -1297,7 +1297,7 @@ class TestSchedulerJob:
 
         assert State.RUNNING == dr2.state
 
-        res = 
self.scheduler_job._critical_section_execute_task_instances(session)
+        res = 
self.scheduler_job._critical_section_enqueue_task_instances(session)
 
         # check that max_active_tasks is respected
         ti1.refresh_from_db()
@@ -1346,7 +1346,7 @@ class TestSchedulerJob:
             ti2.state = State.SCHEDULED
             session.flush()
         self.scheduler_job.max_tis_per_query = 2
-        res = 
self.scheduler_job._critical_section_execute_task_instances(session)
+        res = 
self.scheduler_job._critical_section_enqueue_task_instances(session)
         assert 2 == res
 
         self.scheduler_job.max_tis_per_query = 8
@@ -1356,9 +1356,9 @@ class TestSchedulerJob:
             mock_slots.return_value = 2
             # Check that we don't "overfill" the executor
             assert 2 == res
-            res = 
self.scheduler_job._critical_section_execute_task_instances(session)
+            res = 
self.scheduler_job._critical_section_enqueue_task_instances(session)
 
-        res = 
self.scheduler_job._critical_section_execute_task_instances(session)
+        res = 
self.scheduler_job._critical_section_enqueue_task_instances(session)
         assert 4 == res
         for ti in tis:
             ti.refresh_from_db()
@@ -1398,7 +1398,7 @@ class TestSchedulerJob:
         self.scheduler_job.max_tis_per_query = 0
         self.scheduler_job.executor = MagicMock(slots_available=36)
 
-        res = 
self.scheduler_job._critical_section_execute_task_instances(session)
+        res = 
self.scheduler_job._critical_section_enqueue_task_instances(session)
         # 20 dag runs * 2 tasks each = 40, but limited by number of slots 
available
         assert res == 36
         session.rollback()

Reply via email to