Repository: incubator-airflow Updated Branches: refs/heads/master 8754cb1c6 -> 8c42d03c4
[AIRFLOW-1460] Allow restoration of REMOVED TI's When a task instance exists in the database but its corresponding task no longer exists in the DAG, the scheduler marks the task instance as REMOVED. Once removed, task instances stayed removed forever, even if the task were to be added back to the DAG. This change allows for the restoration of REMOVED task instances. If a task instance is in state REMOVED but the corresponding task is present in the DAG, restore the task instance by setting its state to NONE. A new unit test simulates the removal and restoration of a task from a DAG and verifies that the task instance is restored: `./run_unit_tests.sh tests.models:DagRunTest` JIRA: https://issues.apache.org/jira/browse/AIRFLOW-1460 Closes #3137 from astahlman/airflow-1460-restore- tis Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/8c42d03c Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/8c42d03c Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/8c42d03c Branch: refs/heads/master Commit: 8c42d03c4e35a0046e46f0e2e6db588702ee7e8b Parents: 8754cb1 Author: Andrew Stahlman <andrew.stahl...@gmail.com> Authored: Wed Mar 21 23:54:05 2018 -0700 Committer: Maxime Beauchemin <maximebeauche...@gmail.com> Committed: Wed Mar 21 23:54:05 2018 -0700 ---------------------------------------------------------------------- airflow/models.py | 21 ++++++++++++++++++--- tests/models.py | 24 ++++++++++++++++++++++++ 2 files changed, 42 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8c42d03c/airflow/models.py ---------------------------------------------------------------------- diff --git a/airflow/models.py b/airflow/models.py index c1b608a..aa10ad5 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -4905,16 +4905,31 @@ class DagRun(Base, LoggingMixin): dag = self.get_dag() tis = self.get_task_instances(session=session) - # check for removed tasks + # check for removed or restored tasks task_ids = [] for ti in tis: task_ids.append(ti.task_id) + task = None try: - dag.get_task(ti.task_id) + task = dag.get_task(ti.task_id) except AirflowException: - if self.state is not State.RUNNING and not dag.partial: + if ti.state == State.REMOVED: + pass # ti has already been removed, just ignore it + elif self.state is not State.RUNNING and not dag.partial: + self.log.warning("Failed to get task '{}' for dag '{}'. " + "Marking it as removed.".format(ti, dag)) + Stats.incr( + "task_removed_from_dag.{}".format(dag.dag_id), 1, 1) ti.state = State.REMOVED + is_task_in_dag = task is not None + should_restore_task = is_task_in_dag and ti.state == State.REMOVED + if should_restore_task: + self.log.info("Restoring task '{}' which was previously " + "removed from DAG '{}'".format(ti, dag)) + Stats.incr("task_restored_to_dag.{}".format(dag.dag_id), 1, 1) + ti.state = State.NONE + # check for missing tasks for task in six.itervalues(dag.task_dict): if task.adhoc: http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8c42d03c/tests/models.py ---------------------------------------------------------------------- diff --git a/tests/models.py b/tests/models.py index 5d8184c..98913af 100644 --- a/tests/models.py +++ b/tests/models.py @@ -892,6 +892,30 @@ class DagRunTest(unittest.TestCase): self.assertTrue(dagrun.is_backfill) self.assertFalse(dagrun2.is_backfill) + def test_removed_task_instances_can_be_restored(self): + def with_all_tasks_removed(dag): + return DAG(dag_id=dag.dag_id, start_date=dag.start_date) + + dag = DAG('test_task_restoration', start_date=DEFAULT_DATE) + dag.add_task(DummyOperator(task_id='flaky_task', owner='test')) + + dagrun = self.create_dag_run(dag) + flaky_ti = dagrun.get_task_instances()[0] + self.assertEquals('flaky_task', flaky_ti.task_id) + self.assertEquals(State.NONE, flaky_ti.state) + + dagrun.dag = with_all_tasks_removed(dag) + + dagrun.verify_integrity() + flaky_ti.refresh_from_db() + self.assertEquals(State.REMOVED, flaky_ti.state) + + dagrun.dag.add_task(DummyOperator(task_id='flaky_task', owner='test')) + + dagrun.verify_integrity() + flaky_ti.refresh_from_db() + self.assertEquals(State.NONE, flaky_ti.state) + class DagBagTest(unittest.TestCase): def test_get_existing_dag(self):