Repository: incubator-airflow Updated Branches: refs/heads/master 1f3b60792 -> 26911864f
[AIRFLOW-790] Clean up TaskInstances without DagRuns Closes #2886 from gwax/AIRFLOW- 790_guard_against_orphans Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/26911864 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/26911864 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/26911864 Branch: refs/heads/master Commit: 26911864fecb8cf160db732605b66e3243b10436 Parents: 1f3b607 Author: George Leslie-Waksman <[email protected]> Authored: Thu Jan 11 20:39:23 2018 +0100 Committer: Bolke de Bruin <[email protected]> Committed: Thu Jan 11 20:39:23 2018 +0100 ---------------------------------------------------------------------- .gitignore | 3 +++ airflow/jobs.py | 51 +++++++++++++++++++++++++-------------------------- tests/jobs.py | 37 ++++++++++++++++++++++++++++--------- 3 files changed, 56 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/26911864/.gitignore ---------------------------------------------------------------------- diff --git a/.gitignore b/.gitignore index 254ad15..e32065d 100644 --- a/.gitignore +++ b/.gitignore @@ -113,6 +113,9 @@ ENV/ # PyCharm .idea/ +# Visual Studio Code +.vscode/ + # vim *.swp http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/26911864/airflow/jobs.py ---------------------------------------------------------------------- diff --git a/airflow/jobs.py b/airflow/jobs.py index 6ed3f31..ae6969e 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -949,8 +949,9 @@ class SchedulerJob(BaseJob): """ For all DAG IDs in the SimpleDagBag, look for task instances in the old_states and set them to new_state if the corresponding DagRun - exists but is not in the running state. This normally should not - happen, but it can if the state of DagRuns are changed manually. + does not exist or exists but is not in the running state. This + normally should not happen, but it can if the state of DagRuns are + changed manually. :param old_states: examine TaskInstances in this state :type old_state: list[State] @@ -961,35 +962,33 @@ class SchedulerJob(BaseJob): :type simple_dag_bag: SimpleDagBag """ tis_changed = 0 + query = session \ + .query(models.TaskInstance) \ + .outerjoin(models.DagRun, and_( + models.TaskInstance.dag_id == models.DagRun.dag_id, + models.TaskInstance.execution_date == models.DagRun.execution_date)) \ + .filter(models.TaskInstance.dag_id.in_(simple_dag_bag.dag_ids)) \ + .filter(models.TaskInstance.state.in_(old_states)) \ + .filter(or_( + models.DagRun.state != State.RUNNING, + models.DagRun.state.is_(None))) if self.using_sqlite: - tis_to_change = ( - session - .query(models.TaskInstance) - .filter(models.TaskInstance.dag_id.in_(simple_dag_bag.dag_ids)) - .filter(models.TaskInstance.state.in_(old_states)) - .filter(and_( - models.DagRun.dag_id == models.TaskInstance.dag_id, - models.DagRun.execution_date == models.TaskInstance.execution_date, - models.DagRun.state != State.RUNNING)) - .with_for_update() - .all() - ) + tis_to_change = query \ + .with_for_update() \ + .all() for ti in tis_to_change: ti.set_state(new_state, session=session) tis_changed += 1 else: - tis_changed = ( - session - .query(models.TaskInstance) - .filter(models.TaskInstance.dag_id.in_(simple_dag_bag.dag_ids)) - .filter(models.TaskInstance.state.in_(old_states)) - .filter(and_( - models.DagRun.dag_id == models.TaskInstance.dag_id, - models.DagRun.execution_date == models.TaskInstance.execution_date, - models.DagRun.state != State.RUNNING)) - .update({models.TaskInstance.state: new_state}, - synchronize_session=False) - ) + subq = query.subquery() + tis_changed = session \ + .query(models.TaskInstance) \ + .filter(and_( + models.TaskInstance.dag_id == subq.c.dag_id, + models.TaskInstance.execution_date == + subq.c.execution_date)) \ + .update({models.TaskInstance.state: new_state}, + synchronize_session=False) session.commit() if tis_changed > 0: http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/26911864/tests/jobs.py ---------------------------------------------------------------------- diff --git a/tests/jobs.py b/tests/jobs.py index 5d0420a..e522de5 100644 --- a/tests/jobs.py +++ b/tests/jobs.py @@ -1531,6 +1531,15 @@ class SchedulerJobTest(unittest.TestCase): dag=dag2, owner='airflow') + dag3 = DAG( + dag_id='test_change_state_for_tis_without_dagrun_no_dagrun', + start_date=DEFAULT_DATE) + + DummyOperator( + task_id='dummy', + dag=dag3, + owner='airflow') + session = settings.Session() dr = dag.create_dagrun(run_id=DagRun.ID_PREFIX, state=State.RUNNING, @@ -1552,12 +1561,18 @@ class SchedulerJobTest(unittest.TestCase): ti2.state = State.SCHEDULED session.commit() - dagbag = self._make_simple_dag_bag([dag]) + ti3 = TI(dag3.get_task('dummy'), DEFAULT_DATE) + ti3.state = State.SCHEDULED + session.merge(ti3) + session.commit() + + dagbag = self._make_simple_dag_bag([dag, dag2, dag3]) scheduler = SchedulerJob(num_runs=0, run_duration=0) - scheduler._change_state_for_tis_without_dagrun(simple_dag_bag=dagbag, - old_states=[State.SCHEDULED, State.QUEUED], - new_state=State.NONE, - session=session) + scheduler._change_state_for_tis_without_dagrun( + simple_dag_bag=dagbag, + old_states=[State.SCHEDULED, State.QUEUED], + new_state=State.NONE, + session=session) ti = dr.get_task_instance(task_id='dummy', session=session) ti.refresh_from_db(session=session) @@ -1567,6 +1582,9 @@ class SchedulerJobTest(unittest.TestCase): ti2.refresh_from_db(session=session) self.assertEqual(ti2.state, State.SCHEDULED) + ti3.refresh_from_db(session=session) + self.assertEquals(ti3.state, State.NONE) + dr.refresh_from_db(session=session) dr.state = State.FAILED @@ -1574,10 +1592,11 @@ class SchedulerJobTest(unittest.TestCase): session.merge(dr) session.commit() - scheduler._change_state_for_tis_without_dagrun(simple_dag_bag=dagbag, - old_states=[State.SCHEDULED, State.QUEUED], - new_state=State.NONE, - session=session) + scheduler._change_state_for_tis_without_dagrun( + simple_dag_bag=dagbag, + old_states=[State.SCHEDULED, State.QUEUED], + new_state=State.NONE, + session=session) ti.refresh_from_db(session=session) self.assertEqual(ti.state, State.NONE)
