turbaszek commented on a change in pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#discussion_r422858966
##########
File path: airflow/models/baseoperator.py
##########
@@ -60,9 +60,25 @@
ScheduleInterval = Union[str, timedelta, relativedelta]
+class BaseOperatorMeta(type):
+ """
+ Base metaclass of BaseOperator.
+ """
+ def __call__(cls, *args, **kwargs):
+ """
+ Called when you call BaseOperator(). In this way we are able to
perform an action
+ after initializing an operator no matter where the
``super().__init__`` is called
+ (before or after assign of new attributes in a custom operator).
+ """
+ obj = type.__call__(cls, *args, **kwargs)
+ # Set upstream task defined by XComArgs passed to template fields of
an operator
+ obj._set_xcomargs_dependencies() # pylint: disable=protected-access
Review comment:
> I would vote for dagbag "post" action.
This would require some additional changes to make it work everywhere. For
example, running `dag.run()` doesn't create DagBag thus the relation will not
be set.
> people can assign values not only after super().init in subclass, but also
in the DAG definition file as well.
I am not sure I got it. To use `a.output` you have to first define `a` so
there's where we resolve the upstream for the `a`. Using "functional" approach,
users will have to define upstream tasks of an `a` task before he defines the
`a` otherwise they will not be able to use `output` parameter.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]