yuqian90 commented on a change in pull request #7276: [AIRFLOW-5391] Do not run
skipped tasks when they are cleared
URL: https://github.com/apache/airflow/pull/7276#discussion_r379350352
##########
File path: airflow/operators/branch_operator.py
##########
@@ -48,4 +48,21 @@ def choose_branch(self, context: Dict) -> Union[str,
Iterable[str]]:
raise NotImplementedError
def execute(self, context: Dict):
- self.skip_all_except(context['ti'], self.choose_branch(context))
+ branch = self.choose_branch(context)
+ self.skip_all_except(context['ti'], branch)
+ return branch
+
+ def evaluate_skip_condition(self, xcom_value: Union[str, List[str]],
downstream_ti: TaskInstance):
+ """
+ :param xcom_value: The task_id of list of task_id returned by
execute().
+ :type xcom_value: str or list[str]
+ :param downstream_ti: The TaskInstance of child task
+ :type downstream_ti: airflow.models.TaskInstance
+ :return: True if xcom_value indicates downstream_ti should be skipped.
Otherwise False.
+ :rtype: bool
+ """
+ branch = xcom_value
+ if isinstance(branch, str):
+ branch = [branch]
+
+ return downstream_ti.task_id not in branch
Review comment:
That might actually be one way to do this. There's just one downside i can
see.
For `BranchPythonOperator`, the current behaviour in my PR is to store the
task_ids to follow (because that's in XCom).
If we change this to store the list of task_ids skipped in `SkipMixin`,
people making incremental changes to their existing DAG may find some
surprising behaviour. E.g if the DAG looks like this on 20200101. And the
Branch follows A, skips B and C. `SkipMixin` stores B and C in db as its "list
of task_ids skipped".
```
Branch >> [A, B, C]
```
We then add a D on 20200102. The DAG becomes like this.
```
Branch >> [A, B, C, D]
```
Branch itself is not changed. It still only follows A. But if someone clears
a task on the 20200101 DAG. It'll start running D because D is not found in
the"list of task_ids skipped" of the `SkipMixin`.
I think this behaviour is less intuitive than having Branch automatically
skip anything it did not follow (like what this PR is currently doing).
However I do like your suggestion because the implementation is simpler and
more elegant than forcing developers to implement an additional
`evaluate_skip_condition()`.
What do you think?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services