Repository: incubator-airflow Updated Branches: refs/heads/master 656d045e9 -> 5ee0209cb
[AIRFLOW-1247] Fix ignore_all_dependencies argument ignored Closes #2327 from mremes/patch-1 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/5ee0209c Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/5ee0209c Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/5ee0209c Branch: refs/heads/master Commit: 5ee0209cbe40eeacbd8281f087a139a10239dd49 Parents: 656d045 Author: Matti Remes <re...@iki.fi> Authored: Wed Sep 20 09:54:20 2017 -0700 Committer: Chris Riccomini <criccom...@apache.org> Committed: Wed Sep 20 09:54:20 2017 -0700 ---------------------------------------------------------------------- tests/dags/test_cli_triggered_dags.py | 48 ++++++++++++++++++++++++++++ tests/jobs.py | 51 ++++++++++++++++++++++++++++++ 2 files changed, 99 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5ee0209c/tests/dags/test_cli_triggered_dags.py ---------------------------------------------------------------------- diff --git a/tests/dags/test_cli_triggered_dags.py b/tests/dags/test_cli_triggered_dags.py new file mode 100644 index 0000000..5af8fc8 --- /dev/null +++ b/tests/dags/test_cli_triggered_dags.py @@ -0,0 +1,48 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from datetime import datetime, timedelta +from airflow.models import DAG +from airflow.operators.python_operator import PythonOperator + +DEFAULT_DATE = datetime(2016, 1, 1) +default_args = dict( + start_date=DEFAULT_DATE, + owner='airflow') + + +def fail(): + raise ValueError('Expected failure.') + + +def success(ti=None, *args, **kwargs): + if ti.execution_date != DEFAULT_DATE + timedelta(days=1): + fail() + return + + +# DAG tests that tasks ignore all dependencies + +dag1 = DAG(dag_id='test_run_ignores_all_dependencies', default_args=dict(depends_on_past=True, **default_args)) +dag1_task1 = PythonOperator( + task_id='test_run_dependency_task', + python_callable=fail, + dag=dag1,) +dag1_task2 = PythonOperator( + task_id='test_run_dependent_task', + python_callable=success, + provide_context=True, + dag=dag1,) +dag1_task1.set_downstream(dag1_task2) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5ee0209c/tests/jobs.py ---------------------------------------------------------------------- diff --git a/tests/jobs.py b/tests/jobs.py index 3039e38..fc2a6b7 100644 --- a/tests/jobs.py +++ b/tests/jobs.py @@ -278,6 +278,57 @@ class BackfillJobTest(unittest.TestCase): ti.refresh_from_db() self.assertEquals(ti.state, State.SUCCESS) + def test_run_ignores_all_dependencies(self): + """ + Test that run respects ignore_all_dependencies + """ + dag_id = 'test_run_ignores_all_dependencies' + + dag = self.dagbag.get_dag('test_run_ignores_all_dependencies') + dag.clear() + + task0_id = 'test_run_dependent_task' + args0 = ['run', + '-A', + dag_id, + task0_id, + DEFAULT_DATE.isoformat()] + cli.run(self.parser.parse_args(args0)) + ti_dependent0 = TI( + task=dag.get_task(task0_id), + execution_date=DEFAULT_DATE) + + ti_dependent0.refresh_from_db() + self.assertEquals(ti_dependent0.state, State.FAILED) + + task1_id = 'test_run_dependency_task' + args1 = ['run', + '-A', + dag_id, + task1_id, + (DEFAULT_DATE + datetime.timedelta(days=1)).isoformat()] + cli.run(self.parser.parse_args(args1)) + + ti_dependency = TI( + task=dag.get_task(task1_id), + execution_date=DEFAULT_DATE + datetime.timedelta(days=1)) + ti_dependency.refresh_from_db() + self.assertEquals(ti_dependency.state, State.FAILED) + + task2_id = 'test_run_dependent_task' + args2 = ['run', + '-A', + dag_id, + task2_id, + (DEFAULT_DATE + datetime.timedelta(days=1)).isoformat()] + cli.run(self.parser.parse_args(args2)) + + ti_dependent = TI( + task=dag.get_task(task2_id), + execution_date=DEFAULT_DATE + datetime.timedelta(days=1)) + ti_dependent.refresh_from_db() + self.assertEquals(ti_dependent.state, State.SUCCESS) + def test_cli_backfill_depends_on_past(self): """ Test that CLI respects -I argument