Repository: incubator-airflow Updated Branches: refs/heads/master 293365588 -> 91cd6bf72
[AIRFLOW-647] Restore dag.get_active_runs Simply added a getter back to dag that returns the list of active dag run execution dates for the dag from the DB. Closes #1899 from btallman/RestoreActiveRuns_feature Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/91cd6bf7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/91cd6bf7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/91cd6bf7 Branch: refs/heads/master Commit: 91cd6bf72dd9e0a1dc1a042d2390369eb71c2acc Parents: 2933655 Author: Benjamin Tallman <[email protected]> Authored: Thu Dec 1 10:09:31 2016 +0100 Committer: Bolke de Bruin <[email protected]> Committed: Thu Dec 1 10:09:39 2016 +0100 ---------------------------------------------------------------------- airflow/models.py | 21 ++++++++++++++++++++ tests/jobs.py | 54 ++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 75 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/91cd6bf7/airflow/models.py ---------------------------------------------------------------------- diff --git a/airflow/models.py b/airflow/models.py index d2d1e0b..02e4046 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -2799,6 +2799,27 @@ class DAG(BaseDag, LoggingMixin): DagModel.dag_id == self.dag_id) return qry.value('is_paused') + @provide_session + def get_active_runs(self, session=None): + """ + Returns a list of "running" tasks + :param session: + :return: List of execution dates + """ + runs = ( + session.query(DagRun) + .filter( + DagRun.dag_id == self.dag_id, + DagRun.state == State.RUNNING) + .order_by(DagRun.execution_date) + .all()) + + active_dates = [] + for run in runs: + active_dates.append(run.execution_date) + + return active_dates + @property def latest_execution_date(self): """ http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/91cd6bf7/tests/jobs.py ---------------------------------------------------------------------- diff --git a/tests/jobs.py b/tests/jobs.py index bb74709..62e88e5 100644 --- a/tests/jobs.py +++ b/tests/jobs.py @@ -948,3 +948,57 @@ class SchedulerJobTest(unittest.TestCase): session = settings.Session() self.assertEqual( len(session.query(TI).filter(TI.dag_id == dag_id).all()), 1) + + def test_dag_get_active_runs(self): + """ + Test to check that a DAG returns it's active runs + """ + + now = datetime.datetime.now() + six_hours_ago_to_the_hour = (now - datetime.timedelta(hours=6)).replace(minute=0, second=0, microsecond=0) + + START_DATE = six_hours_ago_to_the_hour + DAG_NAME1 = 'get_active_runs_test' + + default_args = { + 'owner': 'airflow', + 'depends_on_past': False, + 'start_date': START_DATE + + } + dag1 = DAG(DAG_NAME1, + schedule_interval='* * * * *', + max_active_runs=1, + default_args=default_args + ) + + run_this_1 = DummyOperator(task_id='run_this_1', dag=dag1) + run_this_2 = DummyOperator(task_id='run_this_2', dag=dag1) + run_this_2.set_upstream(run_this_1) + run_this_3 = DummyOperator(task_id='run_this_3', dag=dag1) + run_this_3.set_upstream(run_this_2) + + session = settings.Session() + orm_dag = DagModel(dag_id=dag1.dag_id) + session.merge(orm_dag) + session.commit() + session.close() + + scheduler = SchedulerJob() + dag1.clear() + + dr = scheduler.create_dag_run(dag1) + + # We had better get a dag run + self.assertIsNotNone(dr) + + execution_date = dr.execution_date + + running_dates = dag1.get_active_runs() + + try: + running_date = running_dates[0] + except: + running_date = 'Except' + + self.assertEqual(execution_date, running_date, 'Running Date must match Execution Date')
