Repository: incubator-airflow
Updated Branches:
  refs/heads/master 6fb94630c -> d9bba86e9


[AIRFLOW-41] Fix pool oversubscription

Scheduler would send tasks to the queue for "open minus running"
instances. If the task eventually gets picked up and sees
(race condition, because multiple tasks could compete for slot)
that a slot is free, it would run the task. If the slot was not free,
the task would be set back to QUEUED (or SCHEDULED), anyway, returned
to the scheduler for another run. In specific cases, there'd be a
couple of task instances that suffer from the non-atomic read and
be run anyway.

Closes #1872 from gtoonstra/feature/AIRFLOW-41


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/d9bba86e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/d9bba86e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/d9bba86e

Branch: refs/heads/master
Commit: d9bba86e9262e6133a8accf2d01dd94601c1579b
Parents: 6fb9463
Author: gtoonstra <[email protected]>
Authored: Tue Nov 29 21:39:27 2016 +0100
Committer: Bolke de Bruin <[email protected]>
Committed: Tue Nov 29 21:39:44 2016 +0100

----------------------------------------------------------------------
 airflow/models.py                          |  3 +-
 airflow/ti_deps/dep_context.py             |  4 +-
 airflow/ti_deps/deps/pool_has_space_dep.py | 26 -----------
 tests/jobs.py                              | 60 ++++++++++++++++++++++++-
 tests/models.py                            |  6 +--
 5 files changed, 65 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d9bba86e/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index 61ea359..d9bacbd 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -3980,7 +3980,8 @@ class Pool(Base):
         Returns the number of slots open at the moment
         """
         used_slots = self.used_slots(session=session)
-        return self.slots - used_slots
+        queued_slots = self.queued_slots(session=session)
+        return self.slots - used_slots - queued_slots
 
 
 class SlaMiss(Base):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d9bba86e/airflow/ti_deps/dep_context.py
----------------------------------------------------------------------
diff --git a/airflow/ti_deps/dep_context.py b/airflow/ti_deps/dep_context.py
index ff24de0..73ae924 100644
--- a/airflow/ti_deps/dep_context.py
+++ b/airflow/ti_deps/dep_context.py
@@ -17,7 +17,6 @@ from airflow.ti_deps.deps.dagrun_exists_dep import 
DagrunRunningDep
 from airflow.ti_deps.deps.exec_date_after_start_date_dep import 
ExecDateAfterStartDateDep
 from airflow.ti_deps.deps.not_running_dep import NotRunningDep
 from airflow.ti_deps.deps.not_skipped_dep import NotSkippedDep
-from airflow.ti_deps.deps.pool_has_space_dep import PoolHasSpaceDep
 from airflow.ti_deps.deps.runnable_exec_date_dep import RunnableExecDateDep
 from airflow.ti_deps.deps.valid_state_dep import ValidStateDep
 from airflow.utils.state import State
@@ -98,8 +97,7 @@ QUEUE_DEPS = MIN_EXEC_DEPS | {
 # Dependencies that need to be met for a given task instance to be able to get 
run by an
 # executor. This class just extends QueueContext by adding dependencies for 
resources.
 RUN_DEPS = QUEUE_DEPS | {
-    DagTISlotsAvailableDep(),
-    PoolHasSpaceDep(),
+    DagTISlotsAvailableDep()
 }
 
 # TODO(aoen): SCHEDULER_DEPS is not coupled to actual execution in any way and

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d9bba86e/airflow/ti_deps/deps/pool_has_space_dep.py
----------------------------------------------------------------------
diff --git a/airflow/ti_deps/deps/pool_has_space_dep.py 
b/airflow/ti_deps/deps/pool_has_space_dep.py
deleted file mode 100644
index 6808e5e..0000000
--- a/airflow/ti_deps/deps/pool_has_space_dep.py
+++ /dev/null
@@ -1,26 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-from airflow.ti_deps.deps.base_ti_dep import BaseTIDep
-from airflow.utils.db import provide_session
-
-
-class PoolHasSpaceDep(BaseTIDep):
-    NAME = "DAG's Pool Has Space"
-    IGNOREABLE = True
-
-    @provide_session
-    def _get_dep_statuses(self, ti, session, dep_context):
-        if ti.pool_full():
-            yield self._failing_status(
-                reason="Task's pool '{0}' is full.".format(ti.pool))

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d9bba86e/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index 5562177..bb74709 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -32,7 +32,8 @@ from airflow.operators.dummy_operator import DummyOperator
 from airflow.utils.db import provide_session
 from airflow.utils.state import State
 from airflow.utils.timeout import timeout
-
+from airflow.utils.dag_processing import SimpleDagBag
+from mock import patch
 from tests.executor.test_executor import TestExecutor
 
 from airflow import configuration
@@ -740,6 +741,63 @@ class SchedulerJobTest(unittest.TestCase):
             (dag.dag_id, dag_task1.task_id, DEFAULT_DATE)
         )
 
+    @patch.object(TI, 'pool_full')
+    def test_scheduler_verify_pool_full(self, mock_pool_full):
+        """
+        Test task instances not queued when pool is full
+        """
+        mock_pool_full.return_value = False
+
+        dag = DAG(
+            dag_id='test_scheduler_verify_pool_full',
+            start_date=DEFAULT_DATE)
+
+        DummyOperator(
+            task_id='dummy',
+            dag=dag,
+            owner='airflow',
+            pool='test_scheduler_verify_pool_full')
+
+        session = settings.Session()
+        pool = Pool(pool='test_scheduler_verify_pool_full', slots=1)
+        session.add(pool)
+        orm_dag = DagModel(dag_id=dag.dag_id)
+        orm_dag.is_paused = False
+        session.merge(orm_dag)
+        session.commit()
+
+        scheduler = SchedulerJob()
+        dag.clear()
+
+        # Create 2 dagruns, which will create 2 task instances.
+        dr = scheduler.create_dag_run(dag)
+        self.assertIsNotNone(dr)
+        self.assertEquals(dr.execution_date, DEFAULT_DATE)
+        dr = scheduler.create_dag_run(dag)
+        self.assertIsNotNone(dr)
+        queue = []
+        scheduler._process_task_instances(dag, queue=queue)
+        self.assertEquals(len(queue), 2)
+        dagbag = SimpleDagBag([dag])
+
+        # Recreated part of the scheduler here, to kick off tasks -> executor
+        for ti_key in queue:
+            task = dag.get_task(ti_key[1])
+            ti = models.TaskInstance(task, ti_key[2])
+            # Task starts out in the scheduled state. All tasks in the
+            # scheduled state will be sent to the executor
+            ti.state = State.SCHEDULED
+
+            # Also save this task instance to the DB.
+            session.merge(ti)
+            session.commit()
+
+        scheduler._execute_task_instances(dagbag,
+                                          (State.SCHEDULED,
+                                           State.UP_FOR_RETRY))
+
+        self.assertEquals(len(scheduler.executor.queued_tasks), 1)
+
     def test_scheduler_auto_align(self):
         """
         Test if the schedule_interval will be auto aligned with the start_date

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d9bba86e/tests/models.py
----------------------------------------------------------------------
diff --git a/tests/models.py b/tests/models.py
index 6354e71..74103fe 100644
--- a/tests/models.py
+++ b/tests/models.py
@@ -292,8 +292,8 @@ class TaskInstanceTest(unittest.TestCase):
     @patch.object(TI, 'pool_full')
     def test_run_pooling_task(self, mock_pool_full):
         """
-        test that running task with mark_success param update task state as
-        SUCCESS without running task.
+        test that running task update task state as  without running task.
+        (no dependency check in ti_deps anymore, so also -> SUCCESS)
         """
         # Mock the pool out with a full pool because the pool doesn't actually 
exist
         mock_pool_full.return_value = True
@@ -305,7 +305,7 @@ class TaskInstanceTest(unittest.TestCase):
         ti = TI(
             task=task, execution_date=datetime.datetime.now())
         ti.run()
-        self.assertEqual(ti.state, models.State.QUEUED)
+        self.assertEqual(ti.state, models.State.SUCCESS)
 
     @patch.object(TI, 'pool_full')
     def test_run_pooling_task_with_mark_success(self, mock_pool_full):

Reply via email to