[ 
https://issues.apache.org/jira/browse/AIRFLOW-6657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17024976#comment-17024976
 ] 

Qian Yu commented on AIRFLOW-6657:
----------------------------------

Okay nvm it does look very awkward to implement {{BranchPythonOperator}} as 
{{BaseBranchOperator}} or {{BaseBranchOperator}} as {{BranchPythonOperator}}. 
They don't mix very well.

 
 I also saw [~chronitis] mentioned this in the PR: "Here's a possible 
refactoring; moving the logic part into {{SkipMixin}} and building both 
{{BranchOperator}} and {{BranchPythonOperator}} off it. It's possible that 
{{BranchOperator}} is unnecessary now and users should just inherit 
{{SkipMixin}} directly in this logic."

 

It does sound like one of {{BranchPythonOperator}} or {{BaseBranchOperator}} is 
not necessary. I think we should consider deprecating {{BranchOperator}}. 
Reason being, {{BranchPythonOperator}} is pretty convenient and does most of 
the branching needed easily by just taking a {{python_callable}}. However, if 
someone does need the kind of flexibility to do their own branching, the option 
is open to implement {{SkipMixin}} themselves.

Pls let me know what you think.

While reading the PR conversations, I also saw this comment. It probably was 
one of the motivation reasons for {{BaseBranchOperator}}. The reason this does 
not work was because {{myvar}} was not put in the {{op_kwargs}} of the 
{{BranchPythonOperator}}.

This is the example [~chronitis] showed that did not work:
{code:python}
class MyBranch(BranchPythonOperator):
    template_fields = ('myvar', )
    def __init__(self, myvar, **kwargs):
       self.myvar = myvar
       super().__init__(python_callable=self.choose_branch)

    def choose_branch(self):
        # for reasons I'm not totally clear on, but I'm guessing the reference 
to `self` before `super()`
        # has returned creates a closure, any {{ templates }} in `self.myvar` 
here don't get evaluated
        return self.myvar
{code}
This should make it work using {{BranchPythonOperator}}:
{code:python}
class MyBranch(BranchPythonOperator):
    def __init__(self, myvar, **kwargs):
       op_kwargs = {"myvar": myvar}

       def python_callable(*py_args, **py_kwargs):
           return self.choose_branch(*py_args, **py_kwargs)

       super().__init__(python_callable=python_callable, op_kwargs=op_kwargs, 
**kwargs)

    def choose_branch(self, myvar, *args, **kwargs):
        return myvar
{code}

> Implement BranchPythonOperator as BaseBranchOperator
> ----------------------------------------------------
>
>                 Key: AIRFLOW-6657
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-6657
>             Project: Apache Airflow
>          Issue Type: Improvement
>          Components: operators
>    Affects Versions: 1.10.4
>            Reporter: Qian Yu
>            Assignee: Qian Yu
>            Priority: Major
>
> This PR added a base class {{BaseBranchOperator}} that is intended to let 
> people implement custom branching logic as its subclasses. 
> https://github.com/apache/airflow/pull/5231
> However the existing {{BranchPythonOperator}} is implemented on its own, not 
> utilizing {{BaseBranchOperator}}.
> This is making other JIRAs difficult to continue. E.g. this one AIRFLOW-5391. 
> Should subsequent JIRAs be working on {{BranchPythonOperator}} or 
> {{BaseBranchOperator}} or both?
> There was a discussion about implementing {{BranchPythonOperator}} as 
> {{BaseBranchOperator}}. However, the discussion did not continue to implement 
> it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to