ashb commented on a change in pull request #7276: [AIRFLOW-5391] Do not run
skipped tasks when they are cleared
URL: https://github.com/apache/airflow/pull/7276#discussion_r379329062
##########
File path: airflow/operators/branch_operator.py
##########
@@ -48,4 +48,21 @@ def choose_branch(self, context: Dict) -> Union[str,
Iterable[str]]:
raise NotImplementedError
def execute(self, context: Dict):
- self.skip_all_except(context['ti'], self.choose_branch(context))
+ branch = self.choose_branch(context)
+ self.skip_all_except(context['ti'], branch)
+ return branch
+
+ def evaluate_skip_condition(self, xcom_value: Union[str, List[str]],
downstream_ti: TaskInstance):
+ """
+ :param xcom_value: The task_id of list of task_id returned by
execute().
+ :type xcom_value: str or list[str]
+ :param downstream_ti: The TaskInstance of child task
+ :type downstream_ti: airflow.models.TaskInstance
+ :return: True if xcom_value indicates downstream_ti should be skipped.
Otherwise False.
+ :rtype: bool
+ """
+ branch = xcom_value
+ if isinstance(branch, str):
+ branch = [branch]
+
+ return downstream_ti.task_id not in branch
Review comment:
Rather than having this logic here, and needing to implement it each time,
what do you think instead of having `skip` and `skip_all_except` methods in the
mixin storing the list of tasks it has skipped in to Xcom. That way this
evaluate function doesn't need to be subclassed, the parent mix in can handle
it for all cases.
That does possibly make us store more data in Xcom, so maybe that's not the
best plan.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services