Repository: incubator-airflow Updated Branches: refs/heads/master 7c1d7db3d -> 0e892ccd7
[AIRFLOW-2466] consider task_id in _change_state_for_tis_without_dagrun Closes #3360 from gwax/AF2466 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/0e892ccd Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/0e892ccd Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/0e892ccd Branch: refs/heads/master Commit: 0e892ccd7092c6d63b73b418097be2185c0d00e3 Parents: 7c1d7db Author: George Leslie-Waksman <[email protected]> Authored: Wed May 30 10:59:01 2018 +0200 Committer: Fokko Driesprong <[email protected]> Committed: Wed May 30 10:59:01 2018 +0200 ---------------------------------------------------------------------- airflow/jobs.py | 1 + tests/jobs.py | 42 ++++++++++++++++++++++++++++-------------- 2 files changed, 29 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0e892ccd/airflow/jobs.py ---------------------------------------------------------------------- diff --git a/airflow/jobs.py b/airflow/jobs.py index 810ee47..a019879 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -1014,6 +1014,7 @@ class SchedulerJob(BaseJob): .query(models.TaskInstance) \ .filter(and_( models.TaskInstance.dag_id == subq.c.dag_id, + models.TaskInstance.task_id == subq.c.task_id, models.TaskInstance.execution_date == subq.c.execution_date)) \ .update({models.TaskInstance.state: new_state}, http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0e892ccd/tests/jobs.py ---------------------------------------------------------------------- diff --git a/tests/jobs.py b/tests/jobs.py index c7e1810..504d149 100644 --- a/tests/jobs.py +++ b/tests/jobs.py @@ -1611,13 +1611,17 @@ class SchedulerJobTest(unittest.TestCase): self.assertEqual(State.QUEUED, ti.state) def test_change_state_for_tis_without_dagrun(self): - dag = DAG( + dag1 = DAG( dag_id='test_change_state_for_tis_without_dagrun', start_date=DEFAULT_DATE) DummyOperator( task_id='dummy', - dag=dag, + dag=dag1, + owner='airflow') + DummyOperator( + task_id='dummy_b', + dag=dag1, owner='airflow') dag2 = DAG( @@ -1639,7 +1643,7 @@ class SchedulerJobTest(unittest.TestCase): owner='airflow') session = settings.Session() - dr = dag.create_dagrun(run_id=DagRun.ID_PREFIX, + dr1 = dag1.create_dagrun(run_id=DagRun.ID_PREFIX, state=State.RUNNING, execution_date=DEFAULT_DATE, start_date=DEFAULT_DATE, @@ -1651,8 +1655,10 @@ class SchedulerJobTest(unittest.TestCase): start_date=DEFAULT_DATE, session=session) - ti = dr.get_task_instance(task_id='dummy', session=session) - ti.state = State.SCHEDULED + ti1a = dr1.get_task_instance(task_id='dummy', session=session) + ti1a.state = State.SCHEDULED + ti1b = dr1.get_task_instance(task_id='dummy_b', session=session) + ti1b.state = State.SUCCESS session.commit() ti2 = dr2.get_task_instance(task_id='dummy', session=session) @@ -1664,7 +1670,7 @@ class SchedulerJobTest(unittest.TestCase): session.merge(ti3) session.commit() - dagbag = self._make_simple_dag_bag([dag, dag2, dag3]) + dagbag = self._make_simple_dag_bag([dag1, dag2, dag3]) scheduler = SchedulerJob(num_runs=0, run_duration=0) scheduler._change_state_for_tis_without_dagrun( simple_dag_bag=dagbag, @@ -1672,9 +1678,13 @@ class SchedulerJobTest(unittest.TestCase): new_state=State.NONE, session=session) - ti = dr.get_task_instance(task_id='dummy', session=session) - ti.refresh_from_db(session=session) - self.assertEqual(ti.state, State.SCHEDULED) + ti1a = dr1.get_task_instance(task_id='dummy', session=session) + ti1a.refresh_from_db(session=session) + self.assertEqual(ti1a.state, State.SCHEDULED) + + ti1b = dr1.get_task_instance(task_id='dummy_b', session=session) + ti1b.refresh_from_db(session=session) + self.assertEqual(ti1b.state, State.SUCCESS) ti2 = dr2.get_task_instance(task_id='dummy', session=session) ti2.refresh_from_db(session=session) @@ -1683,11 +1693,11 @@ class SchedulerJobTest(unittest.TestCase): ti3.refresh_from_db(session=session) self.assertEquals(ti3.state, State.NONE) - dr.refresh_from_db(session=session) - dr.state = State.FAILED + dr1.refresh_from_db(session=session) + dr1.state = State.FAILED # why o why - session.merge(dr) + session.merge(dr1) session.commit() scheduler._change_state_for_tis_without_dagrun( @@ -1695,8 +1705,12 @@ class SchedulerJobTest(unittest.TestCase): old_states=[State.SCHEDULED, State.QUEUED], new_state=State.NONE, session=session) - ti.refresh_from_db(session=session) - self.assertEqual(ti.state, State.NONE) + ti1a.refresh_from_db(session=session) + self.assertEqual(ti1a.state, State.NONE) + + # don't touch ti1b + ti1b.refresh_from_db(session=session) + self.assertEqual(ti1b.state, State.SUCCESS) # don't touch ti2 ti2.refresh_from_db(session=session)
