Repository: incubator-airflow Updated Branches: refs/heads/master e4b240fb7 -> 5fe25d859
[AIRFLOW-1334] Check if tasks are backfill on scheduler in a join Closes #2384 from saguziel/aguziel-use-join-apache Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/5fe25d85 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/5fe25d85 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/5fe25d85 Branch: refs/heads/master Commit: 5fe25d8598b23b8f641911242fad2061dddbfeec Parents: e4b240f Author: Alex Guziel <[email protected]> Authored: Tue Jun 27 11:58:51 2017 -0700 Committer: Alex Guziel <[email protected]> Committed: Tue Jun 27 11:58:51 2017 -0700 ---------------------------------------------------------------------- airflow/jobs.py | 34 ++++++++++----------- airflow/models.py | 6 ++-- tests/jobs.py | 81 ++++++++++++++++++++++++++++++++++++++++++++++++++ tests/models.py | 8 +++++ 4 files changed, 108 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5fe25d85/airflow/jobs.py ---------------------------------------------------------------------- diff --git a/airflow/jobs.py b/airflow/jobs.py index 2b4350e..f8ab1fa 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -35,8 +35,7 @@ import time from time import sleep import psutil -from sqlalchemy import Column, Integer, String, DateTime, func, Index, or_, and_ -from sqlalchemy import update +from sqlalchemy import Column, Integer, String, DateTime, func, Index, or_, and_, not_ from sqlalchemy.exc import OperationalError from sqlalchemy.orm.session import make_transient from tabulate import tabulate @@ -990,11 +989,21 @@ class SchedulerJob(BaseJob): # Get all the queued task instances from associated with scheduled # DagRuns. TI = models.TaskInstance + DR = models.DagRun + DM = models.DagModel task_instances_to_examine = ( session .query(TI) .filter(TI.dag_id.in_(simple_dag_bag.dag_ids)) .filter(TI.state.in_(states)) + .outerjoin(DR, + and_(DR.dag_id == TI.dag_id, + DR.execution_date == TI.execution_date)) + .filter(or_(DR.run_id == None, + not_(DR.run_id.like(BackfillJob.ID_PREFIX + '%')))) + .outerjoin(DM, DM.dag_id==TI.dag_id) + .filter(or_(DM.dag_id == None, + not_(DM.is_paused))) .all() ) @@ -1043,21 +1052,6 @@ class SchedulerJob(BaseJob): # Can't schedule any more since there are no more open slots. break - if self.executor.has_task(task_instance): - self.logger.debug("Not handling task {} as the executor reports it is running" - .format(task_instance.key)) - continue - - if simple_dag_bag.get_dag(task_instance.dag_id).is_paused: - self.logger.info("Not executing queued {} since {} is paused" - .format(task_instance, task_instance.dag_id)) - continue - - # todo: remove this logic when backfills will be part of the scheduler - dag_run = task_instance.get_dagrun() - if dag_run and dag_run.is_backfill: - continue - # Check to make sure that the task concurrency of the DAG hasn't been # reached. dag_id = task_instance.dag_id @@ -1087,6 +1081,12 @@ class SchedulerJob(BaseJob): task_concurrency_limit)) continue + + if self.executor.has_task(task_instance): + self.logger.debug("Not handling task {} as the executor reports it is running" + .format(task_instance.key)) + continue + command = " ".join(TI.generate_command( task_instance.dag_id, task_instance.task_id, http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5fe25d85/airflow/models.py ---------------------------------------------------------------------- diff --git a/airflow/models.py b/airflow/models.py index 41ad9f8..8566b7f 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -4349,10 +4349,8 @@ class DagRun(Base): @property def is_backfill(self): - if "backfill" in self.run_id: - return True - - return False + from airflow.jobs import BackfillJob + return self.run_id.startswith(BackfillJob.ID_PREFIX) @classmethod @provide_session http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5fe25d85/tests/jobs.py ---------------------------------------------------------------------- diff --git a/tests/jobs.py b/tests/jobs.py index 5c04b05..824cd9d 100644 --- a/tests/jobs.py +++ b/tests/jobs.py @@ -636,6 +636,87 @@ class SchedulerJobTest(unittest.TestCase): scheduler.heartrate = 0 scheduler.run() + def test_execute_task_instances_is_paused_wont_execute(self): + dag_id = 'SchedulerJobTest.test_execute_task_instances_is_paused_wont_execute' + task_id_1 = 'dummy_task' + + dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE) + task1 = DummyOperator(dag=dag, task_id=task_id_1) + dagbag = SimpleDagBag([dag]) + + scheduler = SchedulerJob(**self.default_scheduler_args) + session = settings.Session() + + dr1 = scheduler.create_dag_run(dag) + ti1 = TI(task1, DEFAULT_DATE) + ti1.state = State.SCHEDULED + dr1.state = State.RUNNING + dagmodel = models.DagModel() + dagmodel.dag_id = dag_id + dagmodel.is_paused = True + session.merge(ti1) + session.merge(dr1) + session.add(dagmodel) + session.commit() + + scheduler._execute_task_instances(dagbag, [State.SCHEDULED]) + ti1.refresh_from_db() + self.assertEquals(State.SCHEDULED, ti1.state) + + def test_execute_task_instances_no_dagrun_task_will_execute(self): + """ + Tests that tasks without dagrun still get executed. + """ + dag_id = 'SchedulerJobTest.test_execute_task_instances_no_dagrun_task_will_execute' + task_id_1 = 'dummy_task' + + dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE) + task1 = DummyOperator(dag=dag, task_id=task_id_1) + dagbag = SimpleDagBag([dag]) + + scheduler = SchedulerJob(**self.default_scheduler_args) + session = settings.Session() + + dr1 = scheduler.create_dag_run(dag) + ti1 = TI(task1, DEFAULT_DATE) + ti1.state = State.SCHEDULED + ti1.execution_date = ti1.execution_date + datetime.timedelta(days=1) + session.merge(ti1) + session.commit() + + scheduler._execute_task_instances(dagbag, [State.SCHEDULED]) + ti1.refresh_from_db() + self.assertEquals(State.QUEUED, ti1.state) + + def test_execute_task_instances_backfill_tasks_wont_execute(self): + """ + Tests that backfill tasks won't get executed. + """ + dag_id = 'SchedulerJobTest.test_execute_task_instances_backfill_tasks_wont_execute' + task_id_1 = 'dummy_task' + + dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE) + task1 = DummyOperator(dag=dag, task_id=task_id_1) + dagbag = SimpleDagBag([dag]) + + scheduler = SchedulerJob(**self.default_scheduler_args) + session = settings.Session() + + dr1 = scheduler.create_dag_run(dag) + dr1.run_id = BackfillJob.ID_PREFIX + '_blah' + ti1 = TI(task1, dr1.execution_date) + ti1.refresh_from_db() + ti1.state = State.SCHEDULED + session.merge(ti1) + session.merge(dr1) + session.commit() + + self.assertTrue(dr1.is_backfill) + + scheduler._execute_task_instances(dagbag, [State.SCHEDULED]) + ti1.refresh_from_db() + self.assertEquals(State.SCHEDULED, ti1.state) + def test_concurrency(self): dag_id = 'SchedulerJobTest.test_concurrency' task_id_1 = 'dummy_task' http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5fe25d85/tests/models.py ---------------------------------------------------------------------- diff --git a/tests/models.py b/tests/models.py index 931a6aa..400c659 100644 --- a/tests/models.py +++ b/tests/models.py @@ -25,6 +25,7 @@ import time from airflow import models, settings, AirflowException from airflow.exceptions import AirflowSkipException +from airflow.jobs import BackfillJob from airflow.models import DAG, TaskInstance as TI from airflow.models import State as ST from airflow.models import DagModel, DagStat @@ -533,6 +534,13 @@ class DagRunTest(unittest.TestCase): if dagrun.dag_id == 'test_latest_runs_1': self.assertEqual(dagrun.execution_date, datetime.datetime(2015, 1, 2)) + def test_is_backfill(self): + dag = DAG(dag_id='test_is_backfill', start_date=DEFAULT_DATE) + dagrun = self.create_dag_run(dag, execution_date=DEFAULT_DATE) + dagrun.run_id = BackfillJob.ID_PREFIX + '_sfddsffds' + dagrun2 = self.create_dag_run(dag, execution_date=DEFAULT_DATE + datetime.timedelta(days=1)) + self.assertTrue(dagrun.is_backfill) + self.assertFalse(dagrun2.is_backfill) class DagBagTest(unittest.TestCase):
