Repository: incubator-airflow Updated Branches: refs/heads/master 9ad144657 -> d7d9f883e
AIRFLOW-932][AIRFLOW-932][AIRFLOW-921][AIRFLOW-910] Do not mark tasks removed when backfilling[ In a backfill one can specify a specific task to execute. We create a subset of the orginal tasks in a subdag from the original dag. The subdag has the same name as the original dag. This breaks the integrity check of a dag_run as tasks are suddenly not in scope any more. Closes #2122 from bolkedebruin/AIRFLOW-921 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/d7d9f883 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/d7d9f883 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/d7d9f883 Branch: refs/heads/master Commit: d7d9f883e4db3d864eb88ce510988456e8d468c4 Parents: 9ad1446 Author: Bolke de Bruin <[email protected]> Authored: Sat Mar 11 10:52:07 2017 -0800 Committer: Bolke de Bruin <[email protected]> Committed: Sat Mar 11 10:52:07 2017 -0800 ---------------------------------------------------------------------- airflow/jobs.py | 1 + airflow/models.py | 12 +++++++++++- tests/jobs.py | 49 +++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 61 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d7d9f883/airflow/jobs.py ---------------------------------------------------------------------- diff --git a/airflow/jobs.py b/airflow/jobs.py index f05104a..c7db99f 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -1803,6 +1803,7 @@ class BackfillJob(BaseJob): # explictely mark running as we can fill gaps run.state = State.RUNNING + run.run_id = run_id run.verify_integrity(session=session) # check if we have orphaned tasks http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d7d9f883/airflow/models.py ---------------------------------------------------------------------- diff --git a/airflow/models.py b/airflow/models.py index 1f1667e..ed483f5 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -2696,6 +2696,8 @@ class DAG(BaseDag, LoggingMixin): self.orientation = orientation self.catchup = catchup + self.partial = False + self._comps = { 'dag_id', 'task_ids', @@ -3201,6 +3203,10 @@ class DAG(BaseDag, LoggingMixin): tid for tid in t._upstream_task_ids if tid in dag.task_ids] t._downstream_task_ids = [ tid for tid in t._downstream_task_ids if tid in dag.task_ids] + + if len(dag.tasks) < len(self.tasks): + dag.partial = True + return dag def has_task(self, task_id): @@ -3969,6 +3975,9 @@ class DagRun(Base): else: tis = tis.filter(TI.state.in_(state)) + if self.dag and self.dag.partial: + tis = tis.filter(TI.task_id.in_(self.dag.task_ids)) + return tis.all() @provide_session @@ -4029,6 +4038,7 @@ class DagRun(Base): """ dag = self.get_dag() + tis = self.get_task_instances(session=session) logging.info("Updating state for {} considering {} task(s)" @@ -4113,7 +4123,7 @@ class DagRun(Base): try: dag.get_task(ti.task_id) except AirflowException: - if self.state is not State.RUNNING: + if self.state is not State.RUNNING and not dag.partial: ti.state = State.REMOVED # check for missing tasks http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d7d9f883/tests/jobs.py ---------------------------------------------------------------------- diff --git a/tests/jobs.py b/tests/jobs.py index 00d7829..2dc9108 100644 --- a/tests/jobs.py +++ b/tests/jobs.py @@ -42,6 +42,8 @@ from tests.executors.test_executor import TestExecutor from airflow import configuration configuration.load_test_config() +import sqlalchemy + try: from unittest import mock except ImportError: @@ -294,6 +296,53 @@ class BackfillJobTest(unittest.TestCase): self.assertEqual(ti.state, State.SUCCESS) dag.clear() + def test_sub_set_subdag(self): + dag = DAG( + 'test_sub_set_subdag', + start_date=DEFAULT_DATE, + default_args={'owner': 'owner1'}) + + with dag: + op1 = DummyOperator(task_id='leave1') + op2 = DummyOperator(task_id='leave2') + op3 = DummyOperator(task_id='upstream_level_1') + op4 = DummyOperator(task_id='upstream_level_2') + op5 = DummyOperator(task_id='upstream_level_3') + # order randomly + op2.set_downstream(op3) + op1.set_downstream(op3) + op4.set_downstream(op5) + op3.set_downstream(op4) + + dag.clear() + dr = dag.create_dagrun(run_id="test", + state=State.SUCCESS, + execution_date=DEFAULT_DATE, + start_date=DEFAULT_DATE) + + executor = TestExecutor(do_update=True) + sub_dag = dag.sub_dag(task_regex="leave*", + include_downstream=False, + include_upstream=False) + job = BackfillJob(dag=sub_dag, + start_date=DEFAULT_DATE, + end_date=DEFAULT_DATE, + executor=executor) + job.run() + + self.assertRaises(sqlalchemy.orm.exc.NoResultFound, dr.refresh_from_db) + # the run_id should have changed, so a refresh won't work + drs = DagRun.find(dag_id=dag.dag_id, execution_date=DEFAULT_DATE) + dr = drs[0] + + self.assertEqual(BackfillJob.ID_FORMAT_PREFIX.format(DEFAULT_DATE.isoformat()), + dr.run_id) + for ti in dr.get_task_instances(): + if ti.task_id == 'leave1' or ti.task_id == 'leave2': + self.assertEqual(State.SUCCESS, ti.state) + else: + self.assertEqual(State.NONE, ti.state) + class SchedulerJobTest(unittest.TestCase): # These defaults make the test faster to run
