[ 
https://issues.apache.org/jira/browse/AIRFLOW-6657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17028041#comment-17028041
 ] 

Qian Yu commented on AIRFLOW-6657:
----------------------------------

I renamed the JIRA to be "Deprecate BaseBranchOperator".

 

[~chronitis]  [~kaxilnaik]  [~ash] I understand that {{BaseBranchOperator}} is 
newer than {{BranchPythonOperator}}. However, if we consider this example and 
the {{LatestOnlyOperator}} example I provided in the JIRA description, I really 
think {{BranchPythonOperator}} is actually more concise and has most if not all 
the features offered by {{{{BaseBranchOperator}}}}:

With {{BranchPythonOperator, a lambda is sufficient}}:
{code:python}
        branch_op = BranchPythonOperator(task_id='make_choice',
                                         dag=self.dag,
                                         python_callable=lambda: 'branch_1')
{code}
 

With {{BaseBranchOperator, a new class has to be created}}:
{code:python}
class ChooseBranchOne(BaseBranchOperator):
    def choose_branch(self, context):
        return 'branch_1'

branch_op = ChooseBranchOne(task_id="make_choice", dag=self.dag)
{code}

> Deprecate BaseBranchOperator
> ----------------------------
>
>                 Key: AIRFLOW-6657
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-6657
>             Project: Apache Airflow
>          Issue Type: Improvement
>          Components: operators
>    Affects Versions: 1.10.4
>            Reporter: Qian Yu
>            Assignee: Qian Yu
>            Priority: Major
>
> This PR added a base class {{BaseBranchOperator}} that is intended to let 
> people implement custom branching logic as its subclasses. 
> https://github.com/apache/airflow/pull/5231
> However the existing {{BranchPythonOperator}} is implemented on its own, not 
> utilizing {{BaseBranchOperator}}.
> This is making other JIRAs difficult to continue. E.g. this one AIRFLOW-5391. 
> Should subsequent JIRAs be working on {{BranchPythonOperator}} or 
> {{BaseBranchOperator}} or both?
> Everything that can be done by deriving from {{BaseBranchOperator}} can also 
> be done by deriving from {{BranchPythonOperator}}, often with more readable 
> code. 
> For example, the only current internal use of {{BaseBranchOperator}} is in 
> {{LatestOnlyOperator}}. The code can be changed to something like this 
> without any loss of flexibility. In fact the new code may look more readable 
> because it has no dictionary look-ups from {{context}}:
> {code:python}
> from airflow.operators.python import BranchPythonOperator
> class LatestOnlyOperator(BranchPythonOperator):
>     """
>     Allows a workflow to skip tasks that are not running during the most
>     recent schedule interval.
>     If the task is run outside of the latest schedule interval (i.e. 
> external_trigger),
>     all directly downstream tasks will be skipped.
>     Note that downstream tasks are never skipped if the given DAG_Run is
>     marked as externally triggered.
>     """
>     ui_color = '#e9ffdb'  # nyanza
>     def __init__(self, *args, **kwargs):
>         def python_callable(dag_run, task, dag, execution_date, **_):
>             # If the DAG Run is externally triggered, then return without
>             # skipping downstream tasks
>             if dag_run and dag_run.external_trigger:
>                 self.log.info(
>                     "Externally triggered DAG_Run: allowing execution to 
> proceed.")
>                 return list(task.get_direct_relative_ids(upstream=False))
>             now = pendulum.utcnow()
>             left_window = dag.following_schedule(execution_date)
>             right_window = dag.following_schedule(left_window)
>             self.log.info(
>                 'Checking latest only with left_window: %s right_window: %s 
> now: %s',
>                 left_window, right_window, now
>             )
>             if not left_window < now <= right_window:
>                 self.log.info('Not latest execution, skipping downstream.')
>                 # we return an empty list, thus the parent BaseBranchOperator
>                 # won't exclude any downstream tasks from skipping.
>                 return []
>             else:
>                 self.log.info('Latest, allowing execution to proceed.')
>                 return list(task.get_direct_relative_ids(upstream=False))
>         super().__init__(python_callable=python_callable, *args, **kwargs)
> {code}
> In fact, most of the time, users of {{BranchPythonOperator}} don't need to 
> create a new operator class. They merely need to provide a 
> {{python_callable}} callable argument. So it's arguable that 
> {{BranchPythonOperator}} is actually more convenient and powerful than the 
> more recently added {{BaseBranchOperator}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to