Repository: incubator-airflow Updated Branches: refs/heads/aguziel-use-join-apache [created] 436fe71c8
[AIRFLOW-1334] Check if tasks are backfill on scheduler in a join Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/436fe71c Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/436fe71c Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/436fe71c Branch: refs/heads/aguziel-use-join-apache Commit: 436fe71c89473e2da3b91e2d97166da06409a0f1 Parents: 9958aa9 Author: Alex Guziel <[email protected]> Authored: Wed Jun 21 14:56:36 2017 -0700 Committer: Alex Guziel <[email protected]> Committed: Wed Jun 21 14:57:43 2017 -0700 ---------------------------------------------------------------------- airflow/jobs.py | 11 ++++------- tests/jobs.py | 54 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 58 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/436fe71c/airflow/jobs.py ---------------------------------------------------------------------- diff --git a/airflow/jobs.py b/airflow/jobs.py index 2b4350e..e16c277 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -35,8 +35,7 @@ import time from time import sleep import psutil -from sqlalchemy import Column, Integer, String, DateTime, func, Index, or_, and_ -from sqlalchemy import update +from sqlalchemy import Column, Integer, String, DateTime, func, Index, or_, and_, not_, update from sqlalchemy.exc import OperationalError from sqlalchemy.orm.session import make_transient from tabulate import tabulate @@ -990,11 +989,14 @@ class SchedulerJob(BaseJob): # Get all the queued task instances from associated with scheduled # DagRuns. TI = models.TaskInstance + DR = models.DagRun task_instances_to_examine = ( session .query(TI) .filter(TI.dag_id.in_(simple_dag_bag.dag_ids)) .filter(TI.state.in_(states)) + .outerjoin(DR, and_(DR.dag_id==TI.dag_id, DR.execution_date==TI.execution_date)) + .filter(or_(DR.run_id == None, not_(DR.run_id.like('%backfill%')))) .all() ) @@ -1053,11 +1055,6 @@ class SchedulerJob(BaseJob): .format(task_instance, task_instance.dag_id)) continue - # todo: remove this logic when backfills will be part of the scheduler - dag_run = task_instance.get_dagrun() - if dag_run and dag_run.is_backfill: - continue - # Check to make sure that the task concurrency of the DAG hasn't been # reached. dag_id = task_instance.dag_id http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/436fe71c/tests/jobs.py ---------------------------------------------------------------------- diff --git a/tests/jobs.py b/tests/jobs.py index 5c04b05..e051183 100644 --- a/tests/jobs.py +++ b/tests/jobs.py @@ -636,6 +636,60 @@ class SchedulerJobTest(unittest.TestCase): scheduler.heartrate = 0 scheduler.run() + def test_execute_task_instances_no_dagrun_task_will_execute(self): + """ + Tests that tasks without dagrun still get executed. + """ + dag_id = 'SchedulerJobTest.test_concurrency' + task_id_1 = 'dummy_task' + + dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE) + task1 = DummyOperator(dag=dag, task_id=task_id_1) + dagbag = SimpleDagBag([dag]) + + scheduler = SchedulerJob(**self.default_scheduler_args) + session = settings.Session() + + dr1 = scheduler.create_dag_run(dag) + ti1 = TI(task1, DEFAULT_DATE) + ti1.state = State.SCHEDULED + ti1.execution_date = ti1.execution_date + datetime.timedelta(days=1) + session.merge(ti1) + session.commit() + + scheduler._execute_task_instances(dagbag, [State.SCHEDULED]) + ti1.refresh_from_db() + self.assertEquals(State.QUEUED, ti1.state) + + def test_execute_task_instances_backfill_tasks_wont_execute(self): + """ + Tests that backfill tasks won't get executed. + """ + dag_id = 'SchedulerJobTest.test_concurrency' + task_id_1 = 'dummy_task' + + dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE) + task1 = DummyOperator(dag=dag, task_id=task_id_1) + dagbag = SimpleDagBag([dag]) + + scheduler = SchedulerJob(**self.default_scheduler_args) + session = settings.Session() + + dr1 = scheduler.create_dag_run(dag) + dr1.run_id = 'blaH_backfill_blah' + ti1 = TI(task1, dr1.execution_date) + ti1.refresh_from_db() + ti1.state = State.SCHEDULED + session.merge(ti1) + session.merge(dr1) + session.commit() + + self.assertTrue(dr1.is_backfill) + + scheduler._execute_task_instances(dagbag, [State.SCHEDULED]) + ti1.refresh_from_db() + self.assertEquals(State.SCHEDULED, ti1.state) + def test_concurrency(self): dag_id = 'SchedulerJobTest.test_concurrency' task_id_1 = 'dummy_task'
