This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 5d743e71fd clearer method name in scheduler_job.py (#23702)
5d743e71fd is described below
commit 5d743e71fd0510954afc34e1344510a9c599f1bd
Author: Ryan Hatter <[email protected]>
AuthorDate: Wed May 18 18:06:16 2022 -0500
clearer method name in scheduler_job.py (#23702)
---
airflow/jobs/scheduler_job.py | 8 ++++----
tests/jobs/test_scheduler_job.py | 16 ++++++++--------
2 files changed, 12 insertions(+), 12 deletions(-)
diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index 0a93d07ffb..22ba5decb4 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -553,9 +553,9 @@ class SchedulerJob(BaseJob):
queue=queue,
)
- def _critical_section_execute_task_instances(self, session: Session) ->
int:
+ def _critical_section_enqueue_task_instances(self, session: Session) ->
int:
"""
- Attempts to execute TaskInstances that should be executed by the
scheduler.
+ Enqueues TaskInstances for execution.
There are three steps:
1. Pick TIs by priority with the constraint that they are in the
expected states
@@ -910,7 +910,7 @@ class SchedulerJob(BaseJob):
- Then, via a Critical Section (locking the rows of the Pool model) we
queue tasks, and then send them
to the executor.
- See docs of _critical_section_execute_task_instances for more.
+ See docs of _critical_section_enqueue_task_instances for more.
:return: Number of TIs enqueued in this iteration
:rtype: int
@@ -958,7 +958,7 @@ class SchedulerJob(BaseJob):
timer.start()
# Find anything TIs in state SCHEDULED, try to QUEUE it
(send it to the executor)
- num_queued_tis =
self._critical_section_execute_task_instances(session=session)
+ num_queued_tis =
self._critical_section_enqueue_task_instances(session=session)
# Make sure we only sent this metric if we obtained the
lock, otherwise we'll skew the
# metric, way down
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index a4fe6a0b3e..b50c8bfd7f 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -395,7 +395,7 @@ class TestSchedulerJob:
(ti1,) = dr1.task_instances
ti1.state = State.SCHEDULED
- self.scheduler_job._critical_section_execute_task_instances(session)
+ self.scheduler_job._critical_section_enqueue_task_instances(session)
ti1.refresh_from_db(session=session)
assert State.SCHEDULED == ti1.state
session.rollback()
@@ -423,7 +423,7 @@ class TestSchedulerJob:
assert dr1.is_backfill
- self.scheduler_job._critical_section_execute_task_instances(session)
+ self.scheduler_job._critical_section_enqueue_task_instances(session)
session.flush()
ti1.refresh_from_db()
assert State.SCHEDULED == ti1.state
@@ -1258,7 +1258,7 @@ class TestSchedulerJob:
assert ti.state == State.NONE
mock_queue_command.assert_not_called()
- def test_critical_section_execute_task_instances(self, dag_maker):
+ def test_critical_section_enqueue_task_instances(self, dag_maker):
dag_id = 'SchedulerJobTest.test_execute_task_instances'
task_id_1 = 'dummy_task'
task_id_2 = 'dummy_task_nonexistent_queue'
@@ -1297,7 +1297,7 @@ class TestSchedulerJob:
assert State.RUNNING == dr2.state
- res =
self.scheduler_job._critical_section_execute_task_instances(session)
+ res =
self.scheduler_job._critical_section_enqueue_task_instances(session)
# check that max_active_tasks is respected
ti1.refresh_from_db()
@@ -1346,7 +1346,7 @@ class TestSchedulerJob:
ti2.state = State.SCHEDULED
session.flush()
self.scheduler_job.max_tis_per_query = 2
- res =
self.scheduler_job._critical_section_execute_task_instances(session)
+ res =
self.scheduler_job._critical_section_enqueue_task_instances(session)
assert 2 == res
self.scheduler_job.max_tis_per_query = 8
@@ -1356,9 +1356,9 @@ class TestSchedulerJob:
mock_slots.return_value = 2
# Check that we don't "overfill" the executor
assert 2 == res
- res =
self.scheduler_job._critical_section_execute_task_instances(session)
+ res =
self.scheduler_job._critical_section_enqueue_task_instances(session)
- res =
self.scheduler_job._critical_section_execute_task_instances(session)
+ res =
self.scheduler_job._critical_section_enqueue_task_instances(session)
assert 4 == res
for ti in tis:
ti.refresh_from_db()
@@ -1398,7 +1398,7 @@ class TestSchedulerJob:
self.scheduler_job.max_tis_per_query = 0
self.scheduler_job.executor = MagicMock(slots_available=36)
- res =
self.scheduler_job._critical_section_execute_task_instances(session)
+ res =
self.scheduler_job._critical_section_enqueue_task_instances(session)
# 20 dag runs * 2 tasks each = 40, but limited by number of slots
available
assert res == 36
session.rollback()