ashb commented on a change in pull request #6954: [AIRFLOW-4355] removed task 
should not lead to dagrun success
URL: https://github.com/apache/airflow/pull/6954#discussion_r365205806
 
 

 ##########
 File path: tests/models/test_dagrun.py
 ##########
 @@ -559,3 +561,77 @@ def with_all_tasks_removed(dag):
         dagrun.verify_integrity()
         flaky_ti.refresh_from_db()
         self.assertEqual(State.NONE, flaky_ti.state)
+
+    @conf_vars({
+        ('scheduler', 'removed_tasks_lead_to_dagrun_failure'): 'True'
+    })
+    def test_dagrun_removed_tasks_lead_to_dagrun_failure_true(self):
+        session = settings.Session()
+        on_failure_callback = mock.MagicMock()
+        dag = DAG(
+            'test_dagrun_removed_tasks_lead_to_dagrun_failure_true',
+            start_date=DEFAULT_DATE,
+            default_args={'owner': 'owner1'},
+            on_failure_callback=on_failure_callback
+        )
+        dag.clear()
+        with dag:
+            op1 = DummyOperator(task_id='A')
+            op2 = DummyOperator(task_id='B')
+            op2.set_upstream(op1)
+
+        dag.clear()
+        now = timezone.utcnow()
+        dr = dag.create_dagrun(run_id='test_dagrun_deadlock1' + 
now.isoformat(),
+                               state=State.RUNNING,
+                               execution_date=timezone.datetime(2019, 1, 11),
+                               start_date=now)
+
+        ti_op1 = dr.get_task_instance(task_id=op1.task_id)
+        ti_op1.set_state(state=State.REMOVED, session=session)
+        ti_op2 = dr.get_task_instance(task_id=op2.task_id)
+        ti_op2.set_state(state=State.SUCCESS, session=session)
+        del dag.task_dict[ti_op1.task_id]
+
+        dr.update_state()
+        self.assertEqual(dr.state, State.FAILED)
+        kall = on_failure_callback
+        callback_context = kall.call_args[0][0]
+        self.assertEqual('removed_tasks', callback_context['reason'])
+
+    @conf_vars({
+        ('scheduler', 'removed_tasks_lead_to_dagrun_failure'): 'False'
+    })
+    def test_dagrun_removed_tasks_lead_to_dagrun_failure_false(self):
+        session = settings.Session()
+        on_failure_callback = mock.MagicMock()
+        dag = DAG(
+            'test_dagrun_removed_tasks_lead_to_dagrun_failure_false',
+            start_date=DEFAULT_DATE,
+            default_args={'owner': 'owner1'},
+            on_success_callback=on_failure_callback
+        )
+        dag.clear()
+        with dag:
+            op1 = DummyOperator(task_id='A')
+            op2 = DummyOperator(task_id='B')
+            op2.set_upstream(op1)
+
+        dag.clear()
 
 Review comment:
   We shouldn't need `dag.clear()` in here twice

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to