yuqian90 commented on a change in pull request #7276: [AIRFLOW-5391] Do not run 
skipped tasks when they are cleared
URL: https://github.com/apache/airflow/pull/7276#discussion_r379350352
 
 

 ##########
 File path: airflow/operators/branch_operator.py
 ##########
 @@ -48,4 +48,21 @@ def choose_branch(self, context: Dict) -> Union[str, 
Iterable[str]]:
         raise NotImplementedError
 
     def execute(self, context: Dict):
-        self.skip_all_except(context['ti'], self.choose_branch(context))
+        branch = self.choose_branch(context)
+        self.skip_all_except(context['ti'], branch)
+        return branch
+
+    def evaluate_skip_condition(self, xcom_value: Union[str, List[str]], 
downstream_ti: TaskInstance):
+        """
+        :param xcom_value: The task_id of list of task_id returned by 
execute().
+        :type xcom_value: str or list[str]
+        :param downstream_ti: The TaskInstance of child task
+        :type downstream_ti: airflow.models.TaskInstance
+        :return: True if xcom_value indicates downstream_ti should be skipped. 
Otherwise False.
+        :rtype: bool
+        """
+        branch = xcom_value
+        if isinstance(branch, str):
+            branch = [branch]
+
+        return downstream_ti.task_id not in branch
 
 Review comment:
   That might actually be one way to do this. There's just one downside i can 
see. 
   
   For `BranchPythonOperator`, the current behaviour in my PR is to store the 
task_ids to follow (because that's in XCom). 
   
   If we change this to store the list of task_ids skipped in `SkipMixin`, 
people making incremental changes to their existing DAG may find some 
surprising behaviour. E.g if the DAG looks like this on 20200101. And the 
Branch follows A, skips B and C. `SkipMixin` stores B and C in db as its "list 
of task_ids skipped".
   ```
   Branch >> [A, B, C]
   ```
   
   We then add a D on 20200102. The DAG becomes like this. 
   ```
   Branch >> [A, B, C, D]
   ```
   
   Branch itself is not changed. It still only follows A. But if someone clears 
a task on the 20200101 DAG. It'll start running D because D is not found in 
the"list of task_ids skipped" of the `SkipMixin`.
   
   I think this behaviour is less intuitive than having Branch automatically 
skip anything it did not follow (like what this PR is currently doing).
   
   However I do like your suggestion because the implementation is simpler and 
more elegant than forcing developers to implement an additional 
`evaluate_skip_condition()`.
   
   What do you think?
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to