Repository: incubator-airflow
Updated Branches:
  refs/heads/master e4b240fb7 -> 5fe25d859


[AIRFLOW-1334] Check if tasks are backfill on scheduler in a join

Closes #2384 from saguziel/aguziel-use-join-apache


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/5fe25d85
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/5fe25d85
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/5fe25d85

Branch: refs/heads/master
Commit: 5fe25d8598b23b8f641911242fad2061dddbfeec
Parents: e4b240f
Author: Alex Guziel <[email protected]>
Authored: Tue Jun 27 11:58:51 2017 -0700
Committer: Alex Guziel <[email protected]>
Committed: Tue Jun 27 11:58:51 2017 -0700

----------------------------------------------------------------------
 airflow/jobs.py   | 34 ++++++++++-----------
 airflow/models.py |  6 ++--
 tests/jobs.py     | 81 ++++++++++++++++++++++++++++++++++++++++++++++++++
 tests/models.py   |  8 +++++
 4 files changed, 108 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5fe25d85/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 2b4350e..f8ab1fa 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -35,8 +35,7 @@ import time
 from time import sleep
 
 import psutil
-from sqlalchemy import Column, Integer, String, DateTime, func, Index, or_, 
and_
-from sqlalchemy import update
+from sqlalchemy import Column, Integer, String, DateTime, func, Index, or_, 
and_, not_
 from sqlalchemy.exc import OperationalError
 from sqlalchemy.orm.session import make_transient
 from tabulate import tabulate
@@ -990,11 +989,21 @@ class SchedulerJob(BaseJob):
         # Get all the queued task instances from associated with scheduled
         # DagRuns.
         TI = models.TaskInstance
+        DR = models.DagRun
+        DM = models.DagModel
         task_instances_to_examine = (
             session
             .query(TI)
             .filter(TI.dag_id.in_(simple_dag_bag.dag_ids))
             .filter(TI.state.in_(states))
+            .outerjoin(DR,
+                and_(DR.dag_id == TI.dag_id,
+                     DR.execution_date == TI.execution_date))
+            .filter(or_(DR.run_id == None,
+                    not_(DR.run_id.like(BackfillJob.ID_PREFIX + '%'))))
+            .outerjoin(DM, DM.dag_id==TI.dag_id)
+            .filter(or_(DM.dag_id == None,
+                    not_(DM.is_paused)))
             .all()
         )
 
@@ -1043,21 +1052,6 @@ class SchedulerJob(BaseJob):
                     # Can't schedule any more since there are no more open 
slots.
                     break
 
-                if self.executor.has_task(task_instance):
-                    self.logger.debug("Not handling task {} as the executor 
reports it is running"
-                                      .format(task_instance.key))
-                    continue
-
-                if simple_dag_bag.get_dag(task_instance.dag_id).is_paused:
-                    self.logger.info("Not executing queued {} since {} is 
paused"
-                                     .format(task_instance, 
task_instance.dag_id))
-                    continue
-
-                # todo: remove this logic when backfills will be part of the 
scheduler
-                dag_run = task_instance.get_dagrun()
-                if dag_run and dag_run.is_backfill:
-                    continue
-
                 # Check to make sure that the task concurrency of the DAG 
hasn't been
                 # reached.
                 dag_id = task_instance.dag_id
@@ -1087,6 +1081,12 @@ class SchedulerJob(BaseJob):
                                              task_concurrency_limit))
                     continue
 
+
+                if self.executor.has_task(task_instance):
+                    self.logger.debug("Not handling task {} as the executor 
reports it is running"
+                                      .format(task_instance.key))
+                    continue
+
                 command = " ".join(TI.generate_command(
                     task_instance.dag_id,
                     task_instance.task_id,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5fe25d85/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index 41ad9f8..8566b7f 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -4349,10 +4349,8 @@ class DagRun(Base):
 
     @property
     def is_backfill(self):
-        if "backfill" in self.run_id:
-            return True
-
-        return False
+        from airflow.jobs import BackfillJob
+        return self.run_id.startswith(BackfillJob.ID_PREFIX)
 
     @classmethod
     @provide_session

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5fe25d85/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index 5c04b05..824cd9d 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -636,6 +636,87 @@ class SchedulerJobTest(unittest.TestCase):
         scheduler.heartrate = 0
         scheduler.run()
 
+    def test_execute_task_instances_is_paused_wont_execute(self):
+        dag_id = 
'SchedulerJobTest.test_execute_task_instances_is_paused_wont_execute'
+        task_id_1 = 'dummy_task'
+
+        dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE)
+        task1 = DummyOperator(dag=dag, task_id=task_id_1)
+        dagbag = SimpleDagBag([dag])
+
+        scheduler = SchedulerJob(**self.default_scheduler_args)
+        session = settings.Session()
+
+        dr1 = scheduler.create_dag_run(dag)
+        ti1 = TI(task1, DEFAULT_DATE)
+        ti1.state = State.SCHEDULED
+        dr1.state = State.RUNNING
+        dagmodel = models.DagModel()
+        dagmodel.dag_id = dag_id
+        dagmodel.is_paused = True
+        session.merge(ti1)
+        session.merge(dr1)
+        session.add(dagmodel)
+        session.commit()
+
+        scheduler._execute_task_instances(dagbag, [State.SCHEDULED])
+        ti1.refresh_from_db()
+        self.assertEquals(State.SCHEDULED, ti1.state)
+
+    def test_execute_task_instances_no_dagrun_task_will_execute(self):
+        """
+        Tests that tasks without dagrun still get executed.
+        """
+        dag_id = 
'SchedulerJobTest.test_execute_task_instances_no_dagrun_task_will_execute'
+        task_id_1 = 'dummy_task'
+
+        dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE)
+        task1 = DummyOperator(dag=dag, task_id=task_id_1)
+        dagbag = SimpleDagBag([dag])
+
+        scheduler = SchedulerJob(**self.default_scheduler_args)
+        session = settings.Session()
+
+        dr1 = scheduler.create_dag_run(dag)
+        ti1 = TI(task1, DEFAULT_DATE)
+        ti1.state = State.SCHEDULED
+        ti1.execution_date = ti1.execution_date + datetime.timedelta(days=1)
+        session.merge(ti1)
+        session.commit()
+
+        scheduler._execute_task_instances(dagbag, [State.SCHEDULED])
+        ti1.refresh_from_db()
+        self.assertEquals(State.QUEUED, ti1.state)
+
+    def test_execute_task_instances_backfill_tasks_wont_execute(self):
+        """
+        Tests that backfill tasks won't get executed.
+        """
+        dag_id = 
'SchedulerJobTest.test_execute_task_instances_backfill_tasks_wont_execute'
+        task_id_1 = 'dummy_task'
+
+        dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE)
+        task1 = DummyOperator(dag=dag, task_id=task_id_1)
+        dagbag = SimpleDagBag([dag])
+
+        scheduler = SchedulerJob(**self.default_scheduler_args)
+        session = settings.Session()
+
+        dr1 = scheduler.create_dag_run(dag)
+        dr1.run_id = BackfillJob.ID_PREFIX + '_blah'
+        ti1 = TI(task1, dr1.execution_date)
+        ti1.refresh_from_db()
+        ti1.state = State.SCHEDULED
+        session.merge(ti1)
+        session.merge(dr1)
+        session.commit()
+
+        self.assertTrue(dr1.is_backfill)
+
+        scheduler._execute_task_instances(dagbag, [State.SCHEDULED])
+        ti1.refresh_from_db()
+        self.assertEquals(State.SCHEDULED, ti1.state)
+
     def test_concurrency(self):
         dag_id = 'SchedulerJobTest.test_concurrency'
         task_id_1 = 'dummy_task'

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5fe25d85/tests/models.py
----------------------------------------------------------------------
diff --git a/tests/models.py b/tests/models.py
index 931a6aa..400c659 100644
--- a/tests/models.py
+++ b/tests/models.py
@@ -25,6 +25,7 @@ import time
 
 from airflow import models, settings, AirflowException
 from airflow.exceptions import AirflowSkipException
+from airflow.jobs import BackfillJob
 from airflow.models import DAG, TaskInstance as TI
 from airflow.models import State as ST
 from airflow.models import DagModel, DagStat
@@ -533,6 +534,13 @@ class DagRunTest(unittest.TestCase):
             if dagrun.dag_id == 'test_latest_runs_1':
                 self.assertEqual(dagrun.execution_date, 
datetime.datetime(2015, 1, 2))
 
+    def test_is_backfill(self):
+        dag = DAG(dag_id='test_is_backfill', start_date=DEFAULT_DATE)
+        dagrun = self.create_dag_run(dag, execution_date=DEFAULT_DATE)
+        dagrun.run_id = BackfillJob.ID_PREFIX + '_sfddsffds'
+        dagrun2 = self.create_dag_run(dag, execution_date=DEFAULT_DATE + 
datetime.timedelta(days=1))
+        self.assertTrue(dagrun.is_backfill)
+        self.assertFalse(dagrun2.is_backfill)
 
 class DagBagTest(unittest.TestCase):
 

Reply via email to