Repository: incubator-airflow
Updated Branches:
  refs/heads/v1-9-test f5d46fa70 -> 54cfe3d29


[AIRFLOW-1627] Only query pool in SubDAG init when necessary

When checking for pool conflicts in a SubDAG, ensure that a task in
the SubDAG is actually in the same pool as the SubDagOperator itself
to avoid querying the database unnecessarily.

Closes #2620 from dhuang/AIRFLOW-1627


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/516ace82
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/516ace82
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/516ace82

Branch: refs/heads/v1-9-test
Commit: 516ace82c4ce3517196973829c06a396336b823c
Parents: 8de9fde
Author: Daniel Huang <[email protected]>
Authored: Fri Sep 22 12:40:40 2017 -0700
Committer: Chris Riccomini <[email protected]>
Committed: Fri Sep 22 12:42:38 2017 -0700

----------------------------------------------------------------------
 airflow/operators/subdag_operator.py | 36 ++++++++++++++++---------------
 tests/operators/subdag_operator.py   | 30 ++++++++++++++++++++++++++
 2 files changed, 49 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/516ace82/airflow/operators/subdag_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/subdag_operator.py 
b/airflow/operators/subdag_operator.py
index f4f008d..9445c4c 100644
--- a/airflow/operators/subdag_operator.py
+++ b/airflow/operators/subdag_operator.py
@@ -60,25 +60,27 @@ class SubDagOperator(BaseOperator):
         # validate that subdag operator and subdag tasks don't have a
         # pool conflict
         if self.pool:
-            pool = (
-                session
-                .query(Pool)
-                .filter(Pool.slots == 1)
-                .filter(Pool.pool == self.pool)
-                .first()
-            )
             conflicts = [t for t in subdag.tasks if t.pool == self.pool]
-            if pool and any(t.pool == self.pool for t in subdag.tasks):
-                raise AirflowException(
-                    'SubDagOperator {sd} and subdag task{plural} {t} both use '
-                    'pool {p}, but the pool only has 1 slot. The subdag tasks'
-                    'will never run.'.format(
-                        sd=self.task_id,
-                        plural=len(conflicts) > 1,
-                        t=', '.join(t.task_id for t in conflicts),
-                        p=self.pool
-                    )
+            if conflicts:
+                # only query for pool conflicts if one may exist
+                pool = (
+                    session
+                    .query(Pool)
+                    .filter(Pool.slots == 1)
+                    .filter(Pool.pool == self.pool)
+                    .first()
                 )
+                if pool and any(t.pool == self.pool for t in subdag.tasks):
+                    raise AirflowException(
+                        'SubDagOperator {sd} and subdag task{plural} {t} both '
+                        'use pool {p}, but the pool only has 1 slot. The '
+                        'subdag tasks will never run.'.format(
+                            sd=self.task_id,
+                            plural=len(conflicts) > 1,
+                            t=', '.join(t.task_id for t in conflicts),
+                            p=self.pool
+                        )
+                    )
 
         self.subdag = subdag
         self.executor = executor

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/516ace82/tests/operators/subdag_operator.py
----------------------------------------------------------------------
diff --git a/tests/operators/subdag_operator.py 
b/tests/operators/subdag_operator.py
index 5f049e7..9224f63 100644
--- a/tests/operators/subdag_operator.py
+++ b/tests/operators/subdag_operator.py
@@ -16,6 +16,8 @@ import datetime
 import os
 import unittest
 
+from mock import Mock
+
 import airflow
 from airflow.models import DAG, DagBag
 from airflow.operators.bash_operator import BashOperator
@@ -95,6 +97,34 @@ class SubDagOperatorTests(unittest.TestCase):
         session.delete(pool_10)
         session.commit()
 
+    def test_subdag_pools_no_possible_conflict(self):
+        """
+        Subdags and subdag tasks with no pool overlap, should not to query
+        pools
+        """
+        dag = DAG('parent', default_args=default_args)
+        subdag = DAG('parent.child', default_args=default_args)
+
+        session = airflow.settings.Session()
+        pool_1 = airflow.models.Pool(pool='test_pool_1', slots=1)
+        pool_10 = airflow.models.Pool(pool='test_pool_10', slots=10)
+        session.add(pool_1)
+        session.add(pool_10)
+        session.commit()
+
+        dummy_1 = DummyOperator(
+            task_id='dummy', dag=subdag, pool='test_pool_10')
+
+        mock_session = Mock()
+        SubDagOperator(
+            task_id='child', dag=dag, subdag=subdag, pool='test_pool_1',
+            session=mock_session)
+        self.assertFalse(mock_session.query.called)
+
+        session.delete(pool_1)
+        session.delete(pool_10)
+        session.commit()
+
     def test_subdag_deadlock(self):
         dagbag = DagBag()
         dag = dagbag.get_dag('test_subdag_deadlock')

Reply via email to