Repository: incubator-airflow
Updated Branches:
  refs/heads/v1-8-test 3c8939838 -> 5800f5656


[AIRFLOW-1142] Do not reset orphaned state for backfills

The scheduler could interfere with backfills when
it resets the state
of tasks that were considered orphaned. This patch
prevents the scheduler
from doing so and adds a guard in the backfill.

Closes #2260 from bolkedebruin/AIRFLOW-1142

(cherry picked from commit 4e79b830e3261b9d54fdbc7c9dcb510d36565986)
Signed-off-by: Bolke de Bruin <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/5800f565
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/5800f565
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/5800f565

Branch: refs/heads/v1-8-test
Commit: 5800f565628d11d8ea504468bcc14c4d1c0da10c
Parents: 3c89398
Author: Bolke de Bruin <[email protected]>
Authored: Thu Apr 27 21:17:25 2017 +0200
Committer: Bolke de Bruin <[email protected]>
Committed: Thu Apr 27 21:17:39 2017 +0200

----------------------------------------------------------------------
 airflow/jobs.py   | 10 +++++++++-
 airflow/models.py | 10 +++++++++-
 tests/jobs.py     | 42 ++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 60 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5800f565/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 9a6687c..11dbddf 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -1359,7 +1359,8 @@ class SchedulerJob(BaseJob):
         active_runs = DagRun.find(
             state=State.RUNNING,
             external_trigger=False,
-            session=session
+            session=session,
+            no_backfills=True,
         )
         for dr in active_runs:
             self.logger.info("Resetting {} {}".format(dr.dag_id,
@@ -1856,6 +1857,13 @@ class BackfillJob(BaseJob):
                     self.logger.debug("Task instance to run {} state {}"
                                       .format(ti, ti.state))
 
+                    # guard against externally modified tasks instances or
+                    # in case max concurrency has been reached at task runtime
+                    if ti.state == State.NONE:
+                        self.logger.warning("FIXME: task instance {} state was 
set to "
+                                            "None externally. This should not 
happen")
+                        ti.set_state(State.SCHEDULED, session=session)
+
                     # The task was already marked successful or skipped by a
                     # different Job. Don't rerun it.
                     if ti.state == State.SUCCESS:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5800f565/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index 5db0287..2de88f6 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -3931,7 +3931,8 @@ class DagRun(Base):
     @staticmethod
     @provide_session
     def find(dag_id=None, run_id=None, execution_date=None,
-             state=None, external_trigger=None, session=None):
+             state=None, external_trigger=None, no_backfills=False,
+             session=None):
         """
         Returns a set of dag runs for the given search criteria.
         :param dag_id: the dag_id to find dag runs for
@@ -3944,6 +3945,9 @@ class DagRun(Base):
         :type state: State
         :param external_trigger: whether this dag run is externally triggered
         :type external_trigger: bool
+        :param no_backfills: return no backfills (True), return all (False). 
+        Defaults to False
+        :type no_backfills: bool
         :param session: database session
         :type session: Session
         """
@@ -3963,6 +3967,10 @@ class DagRun(Base):
             qry = qry.filter(DR.state == state)
         if external_trigger is not None:
             qry = qry.filter(DR.external_trigger == external_trigger)
+        if no_backfills:
+            # in order to prevent a circular dependency
+            from airflow.jobs import BackfillJob
+            qry = qry.filter(DR.run_id.notlike(BackfillJob.ID_PREFIX + '%'))
 
         dr = qry.order_by(DR.execution_date).all()
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5800f565/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index 5db858d..2501bdb 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -612,6 +612,48 @@ class SchedulerJobTest(unittest.TestCase):
 
         session.close()
 
+    def test_execute_helper_reset_orphaned_tasks(self):
+        session = settings.Session()
+        dag = DAG(
+            'test_execute_helper_reset_orphaned_tasks',
+            start_date=DEFAULT_DATE,
+            default_args={'owner': 'owner1'})
+
+        with dag:
+            op1 = DummyOperator(task_id='op1')
+
+        dag.clear()
+        dr = dag.create_dagrun(run_id=DagRun.ID_PREFIX,
+                               state=State.RUNNING,
+                               execution_date=DEFAULT_DATE,
+                               start_date=DEFAULT_DATE,
+                               session=session)
+        dr2 = dag.create_dagrun(run_id=BackfillJob.ID_PREFIX,
+                                state=State.RUNNING,
+                                execution_date=DEFAULT_DATE + 
datetime.timedelta(1),
+                                start_date=DEFAULT_DATE,
+                                session=session)
+        ti = dr.get_task_instance(task_id=op1.task_id, session=session)
+        ti.state = State.SCHEDULED
+        ti2 = dr2.get_task_instance(task_id=op1.task_id, session=session)
+        ti2.state = State.SCHEDULED
+        session.commit()
+
+        processor = mock.MagicMock()
+        processor.get_last_finish_time.return_value = None
+
+        scheduler = SchedulerJob(num_runs=0, run_duration=0)
+        executor = TestExecutor()
+        scheduler.executor = executor
+
+        scheduler._execute_helper(processor_manager=processor)
+
+        ti = dr.get_task_instance(task_id=op1.task_id, session=session)
+        self.assertEqual(ti.state, State.NONE)
+
+        ti2 = dr2.get_task_instance(task_id=op1.task_id, session=session)
+        self.assertEqual(ti2.state, State.SCHEDULED)
+
     @provide_session
     def evaluate_dagrun(
             self,

Reply via email to