Repository: incubator-airflow Updated Branches: refs/heads/v1-8-test 3c8939838 -> 5800f5656
[AIRFLOW-1142] Do not reset orphaned state for backfills The scheduler could interfere with backfills when it resets the state of tasks that were considered orphaned. This patch prevents the scheduler from doing so and adds a guard in the backfill. Closes #2260 from bolkedebruin/AIRFLOW-1142 (cherry picked from commit 4e79b830e3261b9d54fdbc7c9dcb510d36565986) Signed-off-by: Bolke de Bruin <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/5800f565 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/5800f565 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/5800f565 Branch: refs/heads/v1-8-test Commit: 5800f565628d11d8ea504468bcc14c4d1c0da10c Parents: 3c89398 Author: Bolke de Bruin <[email protected]> Authored: Thu Apr 27 21:17:25 2017 +0200 Committer: Bolke de Bruin <[email protected]> Committed: Thu Apr 27 21:17:39 2017 +0200 ---------------------------------------------------------------------- airflow/jobs.py | 10 +++++++++- airflow/models.py | 10 +++++++++- tests/jobs.py | 42 ++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 60 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5800f565/airflow/jobs.py ---------------------------------------------------------------------- diff --git a/airflow/jobs.py b/airflow/jobs.py index 9a6687c..11dbddf 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -1359,7 +1359,8 @@ class SchedulerJob(BaseJob): active_runs = DagRun.find( state=State.RUNNING, external_trigger=False, - session=session + session=session, + no_backfills=True, ) for dr in active_runs: self.logger.info("Resetting {} {}".format(dr.dag_id, @@ -1856,6 +1857,13 @@ class BackfillJob(BaseJob): self.logger.debug("Task instance to run {} state {}" .format(ti, ti.state)) + # guard against externally modified tasks instances or + # in case max concurrency has been reached at task runtime + if ti.state == State.NONE: + self.logger.warning("FIXME: task instance {} state was set to " + "None externally. This should not happen") + ti.set_state(State.SCHEDULED, session=session) + # The task was already marked successful or skipped by a # different Job. Don't rerun it. if ti.state == State.SUCCESS: http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5800f565/airflow/models.py ---------------------------------------------------------------------- diff --git a/airflow/models.py b/airflow/models.py index 5db0287..2de88f6 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -3931,7 +3931,8 @@ class DagRun(Base): @staticmethod @provide_session def find(dag_id=None, run_id=None, execution_date=None, - state=None, external_trigger=None, session=None): + state=None, external_trigger=None, no_backfills=False, + session=None): """ Returns a set of dag runs for the given search criteria. :param dag_id: the dag_id to find dag runs for @@ -3944,6 +3945,9 @@ class DagRun(Base): :type state: State :param external_trigger: whether this dag run is externally triggered :type external_trigger: bool + :param no_backfills: return no backfills (True), return all (False). + Defaults to False + :type no_backfills: bool :param session: database session :type session: Session """ @@ -3963,6 +3967,10 @@ class DagRun(Base): qry = qry.filter(DR.state == state) if external_trigger is not None: qry = qry.filter(DR.external_trigger == external_trigger) + if no_backfills: + # in order to prevent a circular dependency + from airflow.jobs import BackfillJob + qry = qry.filter(DR.run_id.notlike(BackfillJob.ID_PREFIX + '%')) dr = qry.order_by(DR.execution_date).all() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5800f565/tests/jobs.py ---------------------------------------------------------------------- diff --git a/tests/jobs.py b/tests/jobs.py index 5db858d..2501bdb 100644 --- a/tests/jobs.py +++ b/tests/jobs.py @@ -612,6 +612,48 @@ class SchedulerJobTest(unittest.TestCase): session.close() + def test_execute_helper_reset_orphaned_tasks(self): + session = settings.Session() + dag = DAG( + 'test_execute_helper_reset_orphaned_tasks', + start_date=DEFAULT_DATE, + default_args={'owner': 'owner1'}) + + with dag: + op1 = DummyOperator(task_id='op1') + + dag.clear() + dr = dag.create_dagrun(run_id=DagRun.ID_PREFIX, + state=State.RUNNING, + execution_date=DEFAULT_DATE, + start_date=DEFAULT_DATE, + session=session) + dr2 = dag.create_dagrun(run_id=BackfillJob.ID_PREFIX, + state=State.RUNNING, + execution_date=DEFAULT_DATE + datetime.timedelta(1), + start_date=DEFAULT_DATE, + session=session) + ti = dr.get_task_instance(task_id=op1.task_id, session=session) + ti.state = State.SCHEDULED + ti2 = dr2.get_task_instance(task_id=op1.task_id, session=session) + ti2.state = State.SCHEDULED + session.commit() + + processor = mock.MagicMock() + processor.get_last_finish_time.return_value = None + + scheduler = SchedulerJob(num_runs=0, run_duration=0) + executor = TestExecutor() + scheduler.executor = executor + + scheduler._execute_helper(processor_manager=processor) + + ti = dr.get_task_instance(task_id=op1.task_id, session=session) + self.assertEqual(ti.state, State.NONE) + + ti2 = dr2.get_task_instance(task_id=op1.task_id, session=session) + self.assertEqual(ti2.state, State.SCHEDULED) + @provide_session def evaluate_dagrun( self,
