evgenyshulman commented on a change in pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#discussion_r422632621



##########
File path: airflow/models/baseoperator.py
##########
@@ -60,9 +60,25 @@
 ScheduleInterval = Union[str, timedelta, relativedelta]
 
 
+class BaseOperatorMeta(type):
+    """
+    Base metaclass of BaseOperator.
+    """
+    def __call__(cls, *args, **kwargs):
+        """
+        Called when you call BaseOperator(). In this way we are able to 
perform an action
+        after initializing an operator no matter where  the 
``super().__init__`` is called
+        (before or after assign of new attributes in a custom operator).
+        """
+        obj = type.__call__(cls, *args, **kwargs)
+        # Set upstream task defined by XComArgs passed to template fields of 
an operator
+        obj._set_xcomargs_dependencies()  # pylint: disable=protected-access

Review comment:
        meta_class introduce backward compatibility problems for users who 
already have metaclass defined for their operator ( they will have to inherit 
from BaseOperatorMeta) 
    
    what happens when user creates operator without DAG? 
    1. we can fix set_upstream or we can just finalize Operator implementation 
when we add it to dag ( thus no metaclass is required)

##########
File path: airflow/models/baseoperator.py
##########
@@ -634,6 +651,33 @@ def deps(self) -> Set[BaseTIDep]:
             NotPreviouslySkippedDep(),
         }
 
+    def _set_xcomargs_dependencies(self) -> None:
+        """
+        Resolves upstream dependencies of a task. In this way passing an 
``XComArg`
+        as value for a template field will result in creating upstream 
relation between
+        two tasks.
+
+        **Example**: ::
+
+            with DAG(...):
+                generate_content = 
GenerateContentOperator(task_id="generate_content")
+                send_email = EmailOperator(..., 
html_content=generate_content.output)
+
+            # This is equivalent to
+            with DAG(...):
+                generate_content = 
GenerateContentOperator(task_id="generate_content")
+                send_email = EmailOperator(
+                    ..., html_content="{{ 
task_instance.xcom_pull('generate_content') }}"
+                )
+                generate_content >> send_email
+
+        """
+        from airflow.models.xcom_arg import XComArg
+        for field in self.template_fields:
+            arg = getattr(self, field)
+            if isinstance(arg, XComArg):
+                self.set_upstream(arg.operator)

Review comment:
       what about pythonOperator? usually, template fields are traversed with 
`operator.render_template` call, we probably should do the same here

##########
File path: tests/models/test_baseoperator.py
##########
@@ -262,6 +263,33 @@ def test_email_on_actions(self):
         assert test_task.email_on_retry is False
         assert test_task.email_on_failure is True
 
+    def test_upstream_is_set_when_template_field_is_xcomarg(self):
+        class CustomOpSuperBefore(DummyOperator):
+            template_fields = ("field",)
+
+            @apply_defaults
+            def __init__(self, field, *args, **kwargs):
+                super().__init__(*args, **kwargs)
+                self.field = field
+
+        class CustomOpSuperAfter(DummyOperator):
+            template_fields = ("field",)
+
+            @apply_defaults
+            def __init__(self, field, *args, **kwargs):
+                self.field = field
+                super().__init__(*args, **kwargs)
+
+        with DAG("test_dag", default_args={"start_date": datetime.today()}):
+            op1 = DummyOperator(task_id="op1")
+            op2 = CustomOpSuperBefore(task_id="op2", field=op1.output)
+            op3 = CustomOpSuperAfter(task_id="op3", field=op1.output)
+
+        assert op1 in op2.upstream_list
+        assert op1 in op3.upstream_list
+        assert op2 in op1.downstream_list
+        assert op3 in op1.downstream_list
+

Review comment:
       we need checks on "operator defined outside of DAG",  pythonOperator 
(op_kwargs) usage 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to