Repository: incubator-airflow
Updated Branches:
  refs/heads/master 44551e249 -> e6973b159


[AIRFLOW-1157] Fix missing pools crashing the scheduler

Throw a warning when a pool associated with a Task
Instance
doesn't exist instead of crashing the scheduler.
Use the default value of 0 slots for non-existent
pools.

Closes #3002 from iansuvak/1157_nonexistent_pool


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/e6973b15
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/e6973b15
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/e6973b15

Branch: refs/heads/master
Commit: e6973b1596914e5d62567e065223e7b169d1c26c
Parents: 44551e2
Author: Ian Suvak <ian.su...@gmail.com>
Authored: Fri Feb 9 11:04:38 2018 +0100
Committer: Fokko Driesprong <fokkodriespr...@godatadriven.com>
Committed: Fri Feb 9 11:04:38 2018 +0100

----------------------------------------------------------------------
 airflow/jobs.py |  9 ++++++++-
 tests/jobs.py   | 24 ++++++++++++++++++++++++
 2 files changed, 32 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e6973b15/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 35a3fb6..00d6b22 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -1097,7 +1097,14 @@ class SchedulerJob(BaseJob):
                 # non_pooled_task_slot_count per run
                 open_slots = conf.getint('core', 'non_pooled_task_slot_count')
             else:
-                open_slots = pools[pool].open_slots(session=session)
+                if pool not in pools:
+                    self.log.warning(
+                        "Tasks using non-existent pool '%s' will not be 
scheduled",
+                        pool
+                    )
+                    open_slots = 0
+                else:
+                    open_slots = pools[pool].open_slots(session=session)
 
             num_queued = len(task_instances)
             self.log.info(

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e6973b15/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index 5771bf1..1c87b8f 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -1170,6 +1170,30 @@ class SchedulerJobTest(unittest.TestCase):
         self.assertIn(tis[1].key, res_keys)
         self.assertIn(tis[3].key, res_keys)
 
+    def test_nonexistent_pool(self):
+        dag_id = 'SchedulerJobTest.test_nonexistent_pool'
+        task_id = 'dummy_wrong_pool'
+        dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, concurrency=16)
+        task = DummyOperator(dag=dag, task_id=task_id, 
pool="this_pool_doesnt_exist")
+        dagbag = self._make_simple_dag_bag([dag])
+
+        scheduler = SchedulerJob(**self.default_scheduler_args)
+        session = settings.Session()
+
+        dr = scheduler.create_dag_run(dag)
+
+        ti = TI(task, dr.execution_date)
+        ti.state = State.SCHEDULED
+        session.merge(ti)
+        session.commit()
+
+        res = scheduler._find_executable_task_instances(
+            dagbag,
+            states=[State.SCHEDULED],
+            session=session)
+        session.commit()
+        self.assertEqual(0, len(res))
+
     def test_find_executable_task_instances_none(self):
         dag_id = 'SchedulerJobTest.test_find_executable_task_instances_none'
         task_id_1 = 'dummy'

Reply via email to