Merge branch 'AIRFLOW-719' into AIRFLOW-719-3
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/15fd4d98 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/15fd4d98 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/15fd4d98 Branch: refs/heads/master Commit: 15fd4d98d141766f81552d270c8b5c43b15f4f44 Parents: f2dae7d eb705fd Author: Bolke de Bruin <[email protected]> Authored: Tue Apr 4 11:55:20 2017 +0200 Committer: Bolke de Bruin <[email protected]> Committed: Tue Apr 4 11:55:20 2017 +0200 ---------------------------------------------------------------------- airflow/operators/latest_only_operator.py | 30 +++- airflow/operators/python_operator.py | 82 +++++++--- airflow/ti_deps/deps/trigger_rule_dep.py | 6 +- scripts/ci/requirements.txt | 1 + tests/dags/test_dagrun_short_circuit_false.py | 38 ----- tests/models.py | 77 +++++----- tests/operators/__init__.py | 2 + tests/operators/latest_only_operator.py | 2 +- tests/operators/python_operator.py | 167 ++++++++++++++++++++- 9 files changed, 301 insertions(+), 104 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/15fd4d98/tests/models.py ---------------------------------------------------------------------- diff --cc tests/models.py index 43fccca,3e77894..a013f8a --- a/tests/models.py +++ b/tests/models.py @@@ -223,59 -220,10 +220,43 @@@ class DagRunTest(unittest.TestCase) def test_id_for_date(self): run_id = models.DagRun.id_for_date( datetime.datetime(2015, 1, 2, 3, 4, 5, 6, None)) - self.assertEqual('scheduled__2015-01-02T03:04:05', run_id, - msg='Generated run_id did not match expectations: {0}' - .format(run_id)) + self.assertEqual( + 'scheduled__2015-01-02T03:04:05', run_id, + 'Generated run_id did not match expectations: {0}'.format(run_id)) + + def test_dagrun_find(self): + session = settings.Session() + now = datetime.datetime.now() + + dag_id1 = "test_dagrun_find_externally_triggered" + dag_run = models.DagRun( + dag_id=dag_id1, + run_id='manual__' + now.isoformat(), + execution_date=now, + start_date=now, + state=State.RUNNING, + external_trigger=True, + ) + session.add(dag_run) + + dag_id2 = "test_dagrun_find_not_externally_triggered" + dag_run = models.DagRun( + dag_id=dag_id2, + run_id='manual__' + now.isoformat(), + execution_date=now, + start_date=now, + state=State.RUNNING, + external_trigger=False, + ) + session.add(dag_run) + + session.commit() + + self.assertEqual(1, len(models.DagRun.find(dag_id=dag_id1, external_trigger=True))) + self.assertEqual(0, len(models.DagRun.find(dag_id=dag_id1, external_trigger=False))) + self.assertEqual(0, len(models.DagRun.find(dag_id=dag_id2, external_trigger=True))) + self.assertEqual(1, len(models.DagRun.find(dag_id=dag_id2, external_trigger=False))) - def test_dagrun_running_when_upstream_skipped(self): - """ - Tests that a DAG run is not failed when an upstream task is skipped - """ - initial_task_states = { - 'test_short_circuit_false': State.SUCCESS, - 'test_state_skipped1': State.SKIPPED, - 'test_state_skipped2': State.NONE, - } - # dags/test_dagrun_short_circuit_false.py - dag_run = self.create_dag_run('test_dagrun_short_circuit_false', - state=State.RUNNING, - task_states=initial_task_states) - updated_dag_state = dag_run.update_state() - self.assertEqual(State.RUNNING, updated_dag_state) - def test_dagrun_success_when_all_skipped(self): """ Tests that a DAG run succeeds when all tasks are skipped
