Repository: incubator-airflow Updated Branches: refs/heads/master 6fb94630c -> d9bba86e9
[AIRFLOW-41] Fix pool oversubscription Scheduler would send tasks to the queue for "open minus running" instances. If the task eventually gets picked up and sees (race condition, because multiple tasks could compete for slot) that a slot is free, it would run the task. If the slot was not free, the task would be set back to QUEUED (or SCHEDULED), anyway, returned to the scheduler for another run. In specific cases, there'd be a couple of task instances that suffer from the non-atomic read and be run anyway. Closes #1872 from gtoonstra/feature/AIRFLOW-41 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/d9bba86e Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/d9bba86e Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/d9bba86e Branch: refs/heads/master Commit: d9bba86e9262e6133a8accf2d01dd94601c1579b Parents: 6fb9463 Author: gtoonstra <[email protected]> Authored: Tue Nov 29 21:39:27 2016 +0100 Committer: Bolke de Bruin <[email protected]> Committed: Tue Nov 29 21:39:44 2016 +0100 ---------------------------------------------------------------------- airflow/models.py | 3 +- airflow/ti_deps/dep_context.py | 4 +- airflow/ti_deps/deps/pool_has_space_dep.py | 26 ----------- tests/jobs.py | 60 ++++++++++++++++++++++++- tests/models.py | 6 +-- 5 files changed, 65 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d9bba86e/airflow/models.py ---------------------------------------------------------------------- diff --git a/airflow/models.py b/airflow/models.py index 61ea359..d9bacbd 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -3980,7 +3980,8 @@ class Pool(Base): Returns the number of slots open at the moment """ used_slots = self.used_slots(session=session) - return self.slots - used_slots + queued_slots = self.queued_slots(session=session) + return self.slots - used_slots - queued_slots class SlaMiss(Base): http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d9bba86e/airflow/ti_deps/dep_context.py ---------------------------------------------------------------------- diff --git a/airflow/ti_deps/dep_context.py b/airflow/ti_deps/dep_context.py index ff24de0..73ae924 100644 --- a/airflow/ti_deps/dep_context.py +++ b/airflow/ti_deps/dep_context.py @@ -17,7 +17,6 @@ from airflow.ti_deps.deps.dagrun_exists_dep import DagrunRunningDep from airflow.ti_deps.deps.exec_date_after_start_date_dep import ExecDateAfterStartDateDep from airflow.ti_deps.deps.not_running_dep import NotRunningDep from airflow.ti_deps.deps.not_skipped_dep import NotSkippedDep -from airflow.ti_deps.deps.pool_has_space_dep import PoolHasSpaceDep from airflow.ti_deps.deps.runnable_exec_date_dep import RunnableExecDateDep from airflow.ti_deps.deps.valid_state_dep import ValidStateDep from airflow.utils.state import State @@ -98,8 +97,7 @@ QUEUE_DEPS = MIN_EXEC_DEPS | { # Dependencies that need to be met for a given task instance to be able to get run by an # executor. This class just extends QueueContext by adding dependencies for resources. RUN_DEPS = QUEUE_DEPS | { - DagTISlotsAvailableDep(), - PoolHasSpaceDep(), + DagTISlotsAvailableDep() } # TODO(aoen): SCHEDULER_DEPS is not coupled to actual execution in any way and http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d9bba86e/airflow/ti_deps/deps/pool_has_space_dep.py ---------------------------------------------------------------------- diff --git a/airflow/ti_deps/deps/pool_has_space_dep.py b/airflow/ti_deps/deps/pool_has_space_dep.py deleted file mode 100644 index 6808e5e..0000000 --- a/airflow/ti_deps/deps/pool_has_space_dep.py +++ /dev/null @@ -1,26 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -from airflow.ti_deps.deps.base_ti_dep import BaseTIDep -from airflow.utils.db import provide_session - - -class PoolHasSpaceDep(BaseTIDep): - NAME = "DAG's Pool Has Space" - IGNOREABLE = True - - @provide_session - def _get_dep_statuses(self, ti, session, dep_context): - if ti.pool_full(): - yield self._failing_status( - reason="Task's pool '{0}' is full.".format(ti.pool)) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d9bba86e/tests/jobs.py ---------------------------------------------------------------------- diff --git a/tests/jobs.py b/tests/jobs.py index 5562177..bb74709 100644 --- a/tests/jobs.py +++ b/tests/jobs.py @@ -32,7 +32,8 @@ from airflow.operators.dummy_operator import DummyOperator from airflow.utils.db import provide_session from airflow.utils.state import State from airflow.utils.timeout import timeout - +from airflow.utils.dag_processing import SimpleDagBag +from mock import patch from tests.executor.test_executor import TestExecutor from airflow import configuration @@ -740,6 +741,63 @@ class SchedulerJobTest(unittest.TestCase): (dag.dag_id, dag_task1.task_id, DEFAULT_DATE) ) + @patch.object(TI, 'pool_full') + def test_scheduler_verify_pool_full(self, mock_pool_full): + """ + Test task instances not queued when pool is full + """ + mock_pool_full.return_value = False + + dag = DAG( + dag_id='test_scheduler_verify_pool_full', + start_date=DEFAULT_DATE) + + DummyOperator( + task_id='dummy', + dag=dag, + owner='airflow', + pool='test_scheduler_verify_pool_full') + + session = settings.Session() + pool = Pool(pool='test_scheduler_verify_pool_full', slots=1) + session.add(pool) + orm_dag = DagModel(dag_id=dag.dag_id) + orm_dag.is_paused = False + session.merge(orm_dag) + session.commit() + + scheduler = SchedulerJob() + dag.clear() + + # Create 2 dagruns, which will create 2 task instances. + dr = scheduler.create_dag_run(dag) + self.assertIsNotNone(dr) + self.assertEquals(dr.execution_date, DEFAULT_DATE) + dr = scheduler.create_dag_run(dag) + self.assertIsNotNone(dr) + queue = [] + scheduler._process_task_instances(dag, queue=queue) + self.assertEquals(len(queue), 2) + dagbag = SimpleDagBag([dag]) + + # Recreated part of the scheduler here, to kick off tasks -> executor + for ti_key in queue: + task = dag.get_task(ti_key[1]) + ti = models.TaskInstance(task, ti_key[2]) + # Task starts out in the scheduled state. All tasks in the + # scheduled state will be sent to the executor + ti.state = State.SCHEDULED + + # Also save this task instance to the DB. + session.merge(ti) + session.commit() + + scheduler._execute_task_instances(dagbag, + (State.SCHEDULED, + State.UP_FOR_RETRY)) + + self.assertEquals(len(scheduler.executor.queued_tasks), 1) + def test_scheduler_auto_align(self): """ Test if the schedule_interval will be auto aligned with the start_date http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d9bba86e/tests/models.py ---------------------------------------------------------------------- diff --git a/tests/models.py b/tests/models.py index 6354e71..74103fe 100644 --- a/tests/models.py +++ b/tests/models.py @@ -292,8 +292,8 @@ class TaskInstanceTest(unittest.TestCase): @patch.object(TI, 'pool_full') def test_run_pooling_task(self, mock_pool_full): """ - test that running task with mark_success param update task state as - SUCCESS without running task. + test that running task update task state as without running task. + (no dependency check in ti_deps anymore, so also -> SUCCESS) """ # Mock the pool out with a full pool because the pool doesn't actually exist mock_pool_full.return_value = True @@ -305,7 +305,7 @@ class TaskInstanceTest(unittest.TestCase): ti = TI( task=task, execution_date=datetime.datetime.now()) ti.run() - self.assertEqual(ti.state, models.State.QUEUED) + self.assertEqual(ti.state, models.State.SUCCESS) @patch.object(TI, 'pool_full') def test_run_pooling_task_with_mark_success(self, mock_pool_full):
