[
https://issues.apache.org/jira/browse/AIRFLOW-6657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17028041#comment-17028041
]
Qian Yu commented on AIRFLOW-6657:
----------------------------------
I renamed the JIRA to be "Deprecate BaseBranchOperator".
[~chronitis] [~kaxilnaik] [~ash] I understand that {{BaseBranchOperator}} is
newer than {{BranchPythonOperator}}. However, if we consider this example and
the {{LatestOnlyOperator}} example I provided in the JIRA description, I really
think {{BranchPythonOperator}} is actually more concise and has most if not all
the features offered by {{{{BaseBranchOperator}}}}:
With {{BranchPythonOperator, a lambda is sufficient}}:
{code:python}
branch_op = BranchPythonOperator(task_id='make_choice',
dag=self.dag,
python_callable=lambda: 'branch_1')
{code}
With {{BaseBranchOperator, a new class has to be created}}:
{code:python}
class ChooseBranchOne(BaseBranchOperator):
def choose_branch(self, context):
return 'branch_1'
branch_op = ChooseBranchOne(task_id="make_choice", dag=self.dag)
{code}
> Deprecate BaseBranchOperator
> ----------------------------
>
> Key: AIRFLOW-6657
> URL: https://issues.apache.org/jira/browse/AIRFLOW-6657
> Project: Apache Airflow
> Issue Type: Improvement
> Components: operators
> Affects Versions: 1.10.4
> Reporter: Qian Yu
> Assignee: Qian Yu
> Priority: Major
>
> This PR added a base class {{BaseBranchOperator}} that is intended to let
> people implement custom branching logic as its subclasses.
> https://github.com/apache/airflow/pull/5231
> However the existing {{BranchPythonOperator}} is implemented on its own, not
> utilizing {{BaseBranchOperator}}.
> This is making other JIRAs difficult to continue. E.g. this one AIRFLOW-5391.
> Should subsequent JIRAs be working on {{BranchPythonOperator}} or
> {{BaseBranchOperator}} or both?
> Everything that can be done by deriving from {{BaseBranchOperator}} can also
> be done by deriving from {{BranchPythonOperator}}, often with more readable
> code.
> For example, the only current internal use of {{BaseBranchOperator}} is in
> {{LatestOnlyOperator}}. The code can be changed to something like this
> without any loss of flexibility. In fact the new code may look more readable
> because it has no dictionary look-ups from {{context}}:
> {code:python}
> from airflow.operators.python import BranchPythonOperator
> class LatestOnlyOperator(BranchPythonOperator):
> """
> Allows a workflow to skip tasks that are not running during the most
> recent schedule interval.
> If the task is run outside of the latest schedule interval (i.e.
> external_trigger),
> all directly downstream tasks will be skipped.
> Note that downstream tasks are never skipped if the given DAG_Run is
> marked as externally triggered.
> """
> ui_color = '#e9ffdb' # nyanza
> def __init__(self, *args, **kwargs):
> def python_callable(dag_run, task, dag, execution_date, **_):
> # If the DAG Run is externally triggered, then return without
> # skipping downstream tasks
> if dag_run and dag_run.external_trigger:
> self.log.info(
> "Externally triggered DAG_Run: allowing execution to
> proceed.")
> return list(task.get_direct_relative_ids(upstream=False))
> now = pendulum.utcnow()
> left_window = dag.following_schedule(execution_date)
> right_window = dag.following_schedule(left_window)
> self.log.info(
> 'Checking latest only with left_window: %s right_window: %s
> now: %s',
> left_window, right_window, now
> )
> if not left_window < now <= right_window:
> self.log.info('Not latest execution, skipping downstream.')
> # we return an empty list, thus the parent BaseBranchOperator
> # won't exclude any downstream tasks from skipping.
> return []
> else:
> self.log.info('Latest, allowing execution to proceed.')
> return list(task.get_direct_relative_ids(upstream=False))
> super().__init__(python_callable=python_callable, *args, **kwargs)
> {code}
> In fact, most of the time, users of {{BranchPythonOperator}} don't need to
> create a new operator class. They merely need to provide a
> {{python_callable}} callable argument. So it's arguable that
> {{BranchPythonOperator}} is actually more convenient and powerful than the
> more recently added {{BaseBranchOperator}}.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)