uranusjr commented on a change in pull request #20743:
URL: https://github.com/apache/airflow/pull/20743#discussion_r786204849
##########
File path: airflow/models/baseoperator.py
##########
@@ -1706,25 +1701,49 @@ def _validate_kwarg_names_for_mapping(cls:
Type[BaseOperator], func_name: str, v
class MappedOperator(DAGNode):
"""Object representing a mapped operator in a DAG"""
- operator_class: Type[BaseOperator] = attr.ib(repr=lambda c: c.__name__)
+ def __repr__(self) -> str:
+ return (
+ f'MappedOperator(task_type={self.task_type}, '
+ + f'task_id={self.task_id!r},
partial_kwargs={self.partial_kwargs!r}, '
+ + f'mapped_kwargs={self.mapped_kwargs!r}, dag={self.dag})'
+ )
+
+ operator_class: Union[Type[BaseOperator], str]
task_type: str = attr.ib()
task_id: str
partial_kwargs: Dict[str, Any]
mapped_kwargs: Dict[str, Any] = attr.ib(
validator=lambda self, _, v:
_validate_kwarg_names_for_mapping(self.operator_class, "map", v)
)
dag: Optional["DAG"] = None
- upstream_task_ids: Set[str] = attr.ib(factory=set, repr=False)
- downstream_task_ids: Set[str] = attr.ib(factory=set, repr=False)
-
- task_group: Optional["TaskGroup"] = attr.ib(repr=False)
+ upstream_task_ids: Set[str] = attr.ib(factory=set)
+ downstream_task_ids: Set[str] = attr.ib(factory=set)
+ task_group: Optional["TaskGroup"] = attr.ib()
# BaseOperator-like interface -- needed so we can add oursleves to the
dag.tasks
- start_date: Optional[pendulum.DateTime] = attr.ib(repr=False, default=None)
- end_date: Optional[pendulum.DateTime] = attr.ib(repr=False, default=None)
+ start_date: Optional[pendulum.DateTime] = attr.ib(default=None)
+ end_date: Optional[pendulum.DateTime] = attr.ib(default=None)
owner: str = attr.ib(repr=False, default=conf.get("operators",
"DEFAULT_OWNER"))
max_active_tis_per_dag: Optional[int] = attr.ib(default=None)
+ # Needed for SerializedBaseOperator
+ _is_dummy: bool = attr.ib()
+
+ deps: Iterable[BaseTIDep] = attr.ib()
+ operator_extra_links: Iterable['BaseOperatorLink'] = ()
+ params: Union[ParamsDict, dict] = attr.ib(factory=ParamsDict)
+ template_fields: Iterable[str] = attr.ib()
+
+ @_is_dummy.default
+ def _is_dummy_default(self):
+ from airflow.operators.dummy import DummyOperator
+
+ return issubclass(self.operator_class, DummyOperator)
+
+ @deps.default
+ def _deps_from_class(self):
+ return self.operator_class.deps
Review comment:
I’m assuming many of these defaults (this, `_is_dummy`, and
`template_fields`, I believe) don’t need to consider when `operator_class` is a
str because in that case these values would’ve been supplied explicitly
instead. (It’s kind of bad it’s designed this way but I guess that can be said
for many things regarding the current serialisation implementation...)
##########
File path: airflow/models/baseoperator.py
##########
@@ -1706,25 +1701,49 @@ def _validate_kwarg_names_for_mapping(cls:
Type[BaseOperator], func_name: str, v
class MappedOperator(DAGNode):
"""Object representing a mapped operator in a DAG"""
- operator_class: Type[BaseOperator] = attr.ib(repr=lambda c: c.__name__)
+ def __repr__(self) -> str:
+ return (
+ f'MappedOperator(task_type={self.task_type}, '
+ + f'task_id={self.task_id!r},
partial_kwargs={self.partial_kwargs!r}, '
+ + f'mapped_kwargs={self.mapped_kwargs!r}, dag={self.dag})'
Review comment:
```suggestion
f'MappedOperator(task_type={self.task_type}, '
f'task_id={self.task_id!r},
partial_kwargs={self.partial_kwargs!r}, '
f'mapped_kwargs={self.mapped_kwargs!r}, dag={self.dag})'
```
implicit string concatenation
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]