feluelle commented on a change in pull request #7276: [AIRFLOW-5391] Do not run
skipped tasks when they are cleared
URL: https://github.com/apache/airflow/pull/7276#discussion_r378244504
##########
File path: tests/operators/test_python.py
##########
@@ -541,6 +542,62 @@ def test_xcom_push(self):
self.assertEqual(
ti.xcom_pull(task_ids='make_choice'), 'branch_1')
+ def test_clear_skipped_downstream_task(self):
+ """
+ After a downstream task is skipped by BranchPythonOperator, clearing
the skipped task
+ should not cause it to be executed.
+ """
+ branch_op = BranchPythonOperator(task_id='make_choice',
+ dag=self.dag,
+ python_callable=lambda: 'branch_1')
+ branches = [self.branch_1, self.branch_2]
+ branch_op >> branches
+ self.dag.clear()
+
+ dr = self.dag.create_dagrun(
+ run_id="manual__",
+ start_date=timezone.utcnow(),
+ execution_date=DEFAULT_DATE,
+ state=State.RUNNING
+ )
+
+ branch_op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+
+ for task in branches:
+ task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+
+ tis = dr.get_task_instances()
+ for ti in tis:
+ if ti.task_id == 'make_choice':
+ self.assertEqual(ti.state, State.SUCCESS)
+ elif ti.task_id == 'branch_1':
+ self.assertEqual(ti.state, State.SUCCESS)
+ elif ti.task_id == 'branch_2':
+ self.assertEqual(ti.state, State.SKIPPED)
+ else:
+ raise Exception
Review comment:
```suggestion
raise ValueError(f'Invalid task id {ti.task_id} found!')
```
WDYT?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services