[
https://issues.apache.org/jira/browse/AIRFLOW-6657?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Qian Yu updated AIRFLOW-6657:
-----------------------------
Description:
This PR added a base class {{BaseBranchOperator}} that is intended to let
people implement custom branching logic as its subclasses.
https://github.com/apache/airflow/pull/5231
However the existing {{BranchPythonOperator}} is implemented on its own, not
utilizing {{BaseBranchOperator}}.
This is making other JIRAs difficult to continue. E.g. this one AIRFLOW-5391.
Should subsequent JIRAs be working on {{BranchPythonOperator}} or
{{BaseBranchOperator}} or both?
Everything that can be done by deriving from {{BaseBranchOperator}} can also be
done by deriving from {{BranchPythonOperator}}, often with more readable code.
For example, the only current internal use of {{BaseBranchOperator}} is in
{{LatestOnlyOperator}}. The code can be changed to something like this without
any loss of flexibility. In fact the new code may look more readable because it
has no dictionary look-ups from {{context}}:
{code:python}
from airflow.operators.python import BranchPythonOperator
class LatestOnlyOperator(BranchPythonOperator):
"""
Allows a workflow to skip tasks that are not running during the most
recent schedule interval.
If the task is run outside of the latest schedule interval (i.e.
external_trigger),
all directly downstream tasks will be skipped.
Note that downstream tasks are never skipped if the given DAG_Run is
marked as externally triggered.
"""
ui_color = '#e9ffdb' # nyanza
def __init__(self, *args, **kwargs):
def python_callable(dag_run, task, dag, execution_date, **_):
# If the DAG Run is externally triggered, then return without
# skipping downstream tasks
if dag_run and dag_run.external_trigger:
self.log.info(
"Externally triggered DAG_Run: allowing execution to
proceed.")
return list(task.get_direct_relative_ids(upstream=False))
now = pendulum.utcnow()
left_window = dag.following_schedule(execution_date)
right_window = dag.following_schedule(left_window)
self.log.info(
'Checking latest only with left_window: %s right_window: %s
now: %s',
left_window, right_window, now
)
if not left_window < now <= right_window:
self.log.info('Not latest execution, skipping downstream.')
# we return an empty list, thus the parent BaseBranchOperator
# won't exclude any downstream tasks from skipping.
return []
else:
self.log.info('Latest, allowing execution to proceed.')
return list(task.get_direct_relative_ids(upstream=False))
super().__init__(python_callable=python_callable, *args, **kwargs)
{code}
In fact, most of the time, users of {{BranchPythonOperator}} don't need to
create a new operator class. They merely need to provide a {{python_callable}}
callable argument. So it's arguable that {{BranchPythonOperator}} is actually
more convenient and powerful than the more recently added
{{BaseBranchOperator}}.
was:
This PR added a base class {{BaseBranchOperator}} that is intended to let
people implement custom branching logic as its subclasses.
https://github.com/apache/airflow/pull/5231
However the existing {{BranchPythonOperator}} is implemented on its own, not
utilizing {{BaseBranchOperator}}.
This is making other JIRAs difficult to continue. E.g. this one AIRFLOW-5391.
Should subsequent JIRAs be working on {{BranchPythonOperator}} or
{{BaseBranchOperator}} or both?
There was a discussion about implementing {{BranchPythonOperator}} as
{{BaseBranchOperator}}. However, the discussion did not continue to implement
it.
> Deprecate BaseBranchOperator
> ----------------------------
>
> Key: AIRFLOW-6657
> URL: https://issues.apache.org/jira/browse/AIRFLOW-6657
> Project: Apache Airflow
> Issue Type: Improvement
> Components: operators
> Affects Versions: 1.10.4
> Reporter: Qian Yu
> Assignee: Qian Yu
> Priority: Major
>
> This PR added a base class {{BaseBranchOperator}} that is intended to let
> people implement custom branching logic as its subclasses.
> https://github.com/apache/airflow/pull/5231
> However the existing {{BranchPythonOperator}} is implemented on its own, not
> utilizing {{BaseBranchOperator}}.
> This is making other JIRAs difficult to continue. E.g. this one AIRFLOW-5391.
> Should subsequent JIRAs be working on {{BranchPythonOperator}} or
> {{BaseBranchOperator}} or both?
> Everything that can be done by deriving from {{BaseBranchOperator}} can also
> be done by deriving from {{BranchPythonOperator}}, often with more readable
> code.
> For example, the only current internal use of {{BaseBranchOperator}} is in
> {{LatestOnlyOperator}}. The code can be changed to something like this
> without any loss of flexibility. In fact the new code may look more readable
> because it has no dictionary look-ups from {{context}}:
> {code:python}
> from airflow.operators.python import BranchPythonOperator
> class LatestOnlyOperator(BranchPythonOperator):
> """
> Allows a workflow to skip tasks that are not running during the most
> recent schedule interval.
> If the task is run outside of the latest schedule interval (i.e.
> external_trigger),
> all directly downstream tasks will be skipped.
> Note that downstream tasks are never skipped if the given DAG_Run is
> marked as externally triggered.
> """
> ui_color = '#e9ffdb' # nyanza
> def __init__(self, *args, **kwargs):
> def python_callable(dag_run, task, dag, execution_date, **_):
> # If the DAG Run is externally triggered, then return without
> # skipping downstream tasks
> if dag_run and dag_run.external_trigger:
> self.log.info(
> "Externally triggered DAG_Run: allowing execution to
> proceed.")
> return list(task.get_direct_relative_ids(upstream=False))
> now = pendulum.utcnow()
> left_window = dag.following_schedule(execution_date)
> right_window = dag.following_schedule(left_window)
> self.log.info(
> 'Checking latest only with left_window: %s right_window: %s
> now: %s',
> left_window, right_window, now
> )
> if not left_window < now <= right_window:
> self.log.info('Not latest execution, skipping downstream.')
> # we return an empty list, thus the parent BaseBranchOperator
> # won't exclude any downstream tasks from skipping.
> return []
> else:
> self.log.info('Latest, allowing execution to proceed.')
> return list(task.get_direct_relative_ids(upstream=False))
> super().__init__(python_callable=python_callable, *args, **kwargs)
> {code}
> In fact, most of the time, users of {{BranchPythonOperator}} don't need to
> create a new operator class. They merely need to provide a
> {{python_callable}} callable argument. So it's arguable that
> {{BranchPythonOperator}} is actually more convenient and powerful than the
> more recently added {{BaseBranchOperator}}.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)