[
https://issues.apache.org/jira/browse/AIRFLOW-6657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17028259#comment-17028259
]
ASF GitHub Bot commented on AIRFLOW-6657:
-----------------------------------------
yuqian90 commented on pull request #7331: [AIRFLOW-6657] Deprecate branch
operator
URL: https://github.com/apache/airflow/pull/7331
Everything that can be done by deriving from ``BaseBranchOperator`` can also
be done using ``BranchPythonOperator`` with more concise and readable code.
E.g. ``LatestOnlyOperator`` can be implemented from ``BranchPythonOperator``
easily. The test tasks in ``test_branch_operator.py`` are also better
implemented using ``BranchPythonOperator`` like the ones in
``tests/operators/test_python.py``
The changes in this PR to ``LatestOnlyOperator`` does not require additional
testing because there's already a ``test_latest_only_operator.py`` that works
fine after the change.
---
Issue link: WILL BE INSERTED BY
[boring-cyborg](https://github.com/kaxil/boring-cyborg)
Make sure to mark the boxes below before creating PR: [x]
- [ ] Description above provides context of the change
- [ ] Commit message/PR title starts with `[AIRFLOW-NNNN]`. AIRFLOW-NNNN =
JIRA ID<sup>*</sup>
- [ ] Unit tests coverage for changes (not needed for documentation changes)
- [ ] Commits follow "[How to write a good git commit
message](http://chris.beams.io/posts/git-commit/)"
- [ ] Relevant documentation is updated including usage instructions.
- [ ] I will engage committers as explained in [Contribution Workflow
Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example).
<sup>*</sup> For document-only changes commit message can start with
`[AIRFLOW-XXXX]`.
---
In case of fundamental code change, Airflow Improvement Proposal
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals))
is needed.
In case of a new dependency, check compliance with the [ASF 3rd Party
License Policy](https://www.apache.org/legal/resolved.html#category-x).
In case of backwards incompatible changes please leave a note in
[UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
Read the [Pull Request
Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)
for more information.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Deprecate BaseBranchOperator
> ----------------------------
>
> Key: AIRFLOW-6657
> URL: https://issues.apache.org/jira/browse/AIRFLOW-6657
> Project: Apache Airflow
> Issue Type: Improvement
> Components: operators
> Affects Versions: 1.10.4
> Reporter: Qian Yu
> Assignee: Qian Yu
> Priority: Major
>
> This PR added a base class {{BaseBranchOperator}} that is intended to let
> people implement custom branching logic as its subclasses.
> https://github.com/apache/airflow/pull/5231
> However the existing {{BranchPythonOperator}} is implemented on its own, not
> utilizing {{BaseBranchOperator}}.
> This is making other JIRAs difficult to continue. E.g. this one AIRFLOW-5391.
> Should subsequent JIRAs be working on {{BranchPythonOperator}} or
> {{BaseBranchOperator}} or both?
> Everything that can be done by deriving from {{BaseBranchOperator}} can also
> be done by deriving from {{BranchPythonOperator}}, often with more readable
> code.
> For example, the only current internal use of {{BaseBranchOperator}} is in
> {{LatestOnlyOperator}}. The code can be changed to something like this
> without any loss of flexibility. In fact the new code may look more readable
> because it has no dictionary look-ups from {{context}}:
> {code:python}
> from airflow.operators.python import BranchPythonOperator
> class LatestOnlyOperator(BranchPythonOperator):
> """
> Allows a workflow to skip tasks that are not running during the most
> recent schedule interval.
> If the task is run outside of the latest schedule interval (i.e.
> external_trigger),
> all directly downstream tasks will be skipped.
> Note that downstream tasks are never skipped if the given DAG_Run is
> marked as externally triggered.
> """
> ui_color = '#e9ffdb' # nyanza
> def __init__(self, *args, **kwargs):
> def python_callable(dag_run, task, dag, execution_date, **_):
> # If the DAG Run is externally triggered, then return without
> # skipping downstream tasks
> if dag_run and dag_run.external_trigger:
> self.log.info(
> "Externally triggered DAG_Run: allowing execution to
> proceed.")
> return list(task.get_direct_relative_ids(upstream=False))
> now = pendulum.utcnow()
> left_window = dag.following_schedule(execution_date)
> right_window = dag.following_schedule(left_window)
> self.log.info(
> 'Checking latest only with left_window: %s right_window: %s
> now: %s',
> left_window, right_window, now
> )
> if not left_window < now <= right_window:
> self.log.info('Not latest execution, skipping downstream.')
> # we return an empty list, thus the parent BaseBranchOperator
> # won't exclude any downstream tasks from skipping.
> return []
> else:
> self.log.info('Latest, allowing execution to proceed.')
> return list(task.get_direct_relative_ids(upstream=False))
> super().__init__(python_callable=python_callable, *args, **kwargs)
> {code}
> In fact, most of the time, users of {{BranchPythonOperator}} don't need to
> create a new operator class. They merely need to provide a
> {{python_callable}} callable argument. So it's arguable that
> {{BranchPythonOperator}} is actually more convenient and powerful than the
> more recently added {{BaseBranchOperator}}.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)