ashb commented on a change in pull request #20743:
URL: https://github.com/apache/airflow/pull/20743#discussion_r786222539
##########
File path: airflow/models/baseoperator.py
##########
@@ -1706,25 +1701,49 @@ def _validate_kwarg_names_for_mapping(cls:
Type[BaseOperator], func_name: str, v
class MappedOperator(DAGNode):
"""Object representing a mapped operator in a DAG"""
- operator_class: Type[BaseOperator] = attr.ib(repr=lambda c: c.__name__)
+ def __repr__(self) -> str:
+ return (
+ f'MappedOperator(task_type={self.task_type}, '
+ + f'task_id={self.task_id!r},
partial_kwargs={self.partial_kwargs!r}, '
+ + f'mapped_kwargs={self.mapped_kwargs!r}, dag={self.dag})'
+ )
+
+ operator_class: Union[Type[BaseOperator], str]
task_type: str = attr.ib()
task_id: str
partial_kwargs: Dict[str, Any]
mapped_kwargs: Dict[str, Any] = attr.ib(
validator=lambda self, _, v:
_validate_kwarg_names_for_mapping(self.operator_class, "map", v)
)
dag: Optional["DAG"] = None
- upstream_task_ids: Set[str] = attr.ib(factory=set, repr=False)
- downstream_task_ids: Set[str] = attr.ib(factory=set, repr=False)
-
- task_group: Optional["TaskGroup"] = attr.ib(repr=False)
+ upstream_task_ids: Set[str] = attr.ib(factory=set)
+ downstream_task_ids: Set[str] = attr.ib(factory=set)
+ task_group: Optional["TaskGroup"] = attr.ib()
# BaseOperator-like interface -- needed so we can add oursleves to the
dag.tasks
- start_date: Optional[pendulum.DateTime] = attr.ib(repr=False, default=None)
- end_date: Optional[pendulum.DateTime] = attr.ib(repr=False, default=None)
+ start_date: Optional[pendulum.DateTime] = attr.ib(default=None)
+ end_date: Optional[pendulum.DateTime] = attr.ib(default=None)
owner: str = attr.ib(repr=False, default=conf.get("operators",
"DEFAULT_OWNER"))
max_active_tis_per_dag: Optional[int] = attr.ib(default=None)
+ # Needed for SerializedBaseOperator
+ _is_dummy: bool = attr.ib()
+
+ deps: Iterable[BaseTIDep] = attr.ib()
+ operator_extra_links: Iterable['BaseOperatorLink'] = ()
+ params: Union[ParamsDict, dict] = attr.ib(factory=ParamsDict)
+ template_fields: Iterable[str] = attr.ib()
+
+ @_is_dummy.default
+ def _is_dummy_default(self):
+ from airflow.operators.dummy import DummyOperator
+
+ return issubclass(self.operator_class, DummyOperator)
+
+ @deps.default
+ def _deps_from_class(self):
+ return self.operator_class.deps
Review comment:
Yes, exactly that. I've already started thinking in my head about how to
refactor/rearchitect the serialization and deserialization.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]