Repository: incubator-airflow Updated Branches: refs/heads/master 632a99a2f -> 601045027
[AIRFLOW-1627] Only query pool in SubDAG init when necessary When checking for pool conflicts in a SubDAG, ensure that a task in the SubDAG is actually in the same pool as the SubDagOperator itself to avoid querying the database unnecessarily. Closes #2620 from dhuang/AIRFLOW-1627 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/60104502 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/60104502 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/60104502 Branch: refs/heads/master Commit: 601045027212b0fdd9899d1eec0dfa438ecb0450 Parents: 632a99a Author: Daniel Huang <[email protected]> Authored: Fri Sep 22 12:40:40 2017 -0700 Committer: Chris Riccomini <[email protected]> Committed: Fri Sep 22 12:40:46 2017 -0700 ---------------------------------------------------------------------- airflow/operators/subdag_operator.py | 36 ++++++++++++++++--------------- tests/operators/subdag_operator.py | 30 ++++++++++++++++++++++++++ 2 files changed, 49 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/60104502/airflow/operators/subdag_operator.py ---------------------------------------------------------------------- diff --git a/airflow/operators/subdag_operator.py b/airflow/operators/subdag_operator.py index f4f008d..9445c4c 100644 --- a/airflow/operators/subdag_operator.py +++ b/airflow/operators/subdag_operator.py @@ -60,25 +60,27 @@ class SubDagOperator(BaseOperator): # validate that subdag operator and subdag tasks don't have a # pool conflict if self.pool: - pool = ( - session - .query(Pool) - .filter(Pool.slots == 1) - .filter(Pool.pool == self.pool) - .first() - ) conflicts = [t for t in subdag.tasks if t.pool == self.pool] - if pool and any(t.pool == self.pool for t in subdag.tasks): - raise AirflowException( - 'SubDagOperator {sd} and subdag task{plural} {t} both use ' - 'pool {p}, but the pool only has 1 slot. The subdag tasks' - 'will never run.'.format( - sd=self.task_id, - plural=len(conflicts) > 1, - t=', '.join(t.task_id for t in conflicts), - p=self.pool - ) + if conflicts: + # only query for pool conflicts if one may exist + pool = ( + session + .query(Pool) + .filter(Pool.slots == 1) + .filter(Pool.pool == self.pool) + .first() ) + if pool and any(t.pool == self.pool for t in subdag.tasks): + raise AirflowException( + 'SubDagOperator {sd} and subdag task{plural} {t} both ' + 'use pool {p}, but the pool only has 1 slot. The ' + 'subdag tasks will never run.'.format( + sd=self.task_id, + plural=len(conflicts) > 1, + t=', '.join(t.task_id for t in conflicts), + p=self.pool + ) + ) self.subdag = subdag self.executor = executor http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/60104502/tests/operators/subdag_operator.py ---------------------------------------------------------------------- diff --git a/tests/operators/subdag_operator.py b/tests/operators/subdag_operator.py index 5f049e7..9224f63 100644 --- a/tests/operators/subdag_operator.py +++ b/tests/operators/subdag_operator.py @@ -16,6 +16,8 @@ import datetime import os import unittest +from mock import Mock + import airflow from airflow.models import DAG, DagBag from airflow.operators.bash_operator import BashOperator @@ -95,6 +97,34 @@ class SubDagOperatorTests(unittest.TestCase): session.delete(pool_10) session.commit() + def test_subdag_pools_no_possible_conflict(self): + """ + Subdags and subdag tasks with no pool overlap, should not to query + pools + """ + dag = DAG('parent', default_args=default_args) + subdag = DAG('parent.child', default_args=default_args) + + session = airflow.settings.Session() + pool_1 = airflow.models.Pool(pool='test_pool_1', slots=1) + pool_10 = airflow.models.Pool(pool='test_pool_10', slots=10) + session.add(pool_1) + session.add(pool_10) + session.commit() + + dummy_1 = DummyOperator( + task_id='dummy', dag=subdag, pool='test_pool_10') + + mock_session = Mock() + SubDagOperator( + task_id='child', dag=dag, subdag=subdag, pool='test_pool_1', + session=mock_session) + self.assertFalse(mock_session.query.called) + + session.delete(pool_1) + session.delete(pool_10) + session.commit() + def test_subdag_deadlock(self): dagbag = DagBag() dag = dagbag.get_dag('test_subdag_deadlock')
