[
https://issues.apache.org/jira/browse/AIRFLOW-6657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17024976#comment-17024976
]
Qian Yu commented on AIRFLOW-6657:
----------------------------------
Okay nvm it does look very awkward to implement {{BranchPythonOperator}} as
{{BaseBranchOperator}} or {{BaseBranchOperator}} as {{BranchPythonOperator}}.
They don't mix very well.
I also saw [~chronitis] mentioned this in the PR: "Here's a possible
refactoring; moving the logic part into {{SkipMixin}} and building both
{{BranchOperator}} and {{BranchPythonOperator}} off it. It's possible that
{{BranchOperator}} is unnecessary now and users should just inherit
{{SkipMixin}} directly in this logic."
It does sound like one of {{BranchPythonOperator}} or {{BaseBranchOperator}} is
not necessary. I think we should consider deprecating {{BranchOperator}}.
Reason being, {{BranchPythonOperator}} is pretty convenient and does most of
the branching needed easily by just taking a {{python_callable}}. However, if
someone does need the kind of flexibility to do their own branching, the option
is open to implement {{SkipMixin}} themselves.
Pls let me know what you think.
While reading the PR conversations, I also saw this comment. It probably was
one of the motivation reasons for {{BaseBranchOperator}}. The reason this does
not work was because {{myvar}} was not put in the {{op_kwargs}} of the
{{BranchPythonOperator}}.
This is the example [~chronitis] showed that did not work:
{code:python}
class MyBranch(BranchPythonOperator):
template_fields = ('myvar', )
def __init__(self, myvar, **kwargs):
self.myvar = myvar
super().__init__(python_callable=self.choose_branch)
def choose_branch(self):
# for reasons I'm not totally clear on, but I'm guessing the reference
to `self` before `super()`
# has returned creates a closure, any {{ templates }} in `self.myvar`
here don't get evaluated
return self.myvar
{code}
This should make it work using {{BranchPythonOperator}}:
{code:python}
class MyBranch(BranchPythonOperator):
def __init__(self, myvar, **kwargs):
op_kwargs = {"myvar": myvar}
def python_callable(*py_args, **py_kwargs):
return self.choose_branch(*py_args, **py_kwargs)
super().__init__(python_callable=python_callable, op_kwargs=op_kwargs,
**kwargs)
def choose_branch(self, myvar, *args, **kwargs):
return myvar
{code}
> Implement BranchPythonOperator as BaseBranchOperator
> ----------------------------------------------------
>
> Key: AIRFLOW-6657
> URL: https://issues.apache.org/jira/browse/AIRFLOW-6657
> Project: Apache Airflow
> Issue Type: Improvement
> Components: operators
> Affects Versions: 1.10.4
> Reporter: Qian Yu
> Assignee: Qian Yu
> Priority: Major
>
> This PR added a base class {{BaseBranchOperator}} that is intended to let
> people implement custom branching logic as its subclasses.
> https://github.com/apache/airflow/pull/5231
> However the existing {{BranchPythonOperator}} is implemented on its own, not
> utilizing {{BaseBranchOperator}}.
> This is making other JIRAs difficult to continue. E.g. this one AIRFLOW-5391.
> Should subsequent JIRAs be working on {{BranchPythonOperator}} or
> {{BaseBranchOperator}} or both?
> There was a discussion about implementing {{BranchPythonOperator}} as
> {{BaseBranchOperator}}. However, the discussion did not continue to implement
> it.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)