uranusjr commented on a change in pull request #19965:
URL: https://github.com/apache/airflow/pull/19965#discussion_r771473841



##########
File path: airflow/models/taskmixin.py
##########
@@ -88,3 +99,167 @@ def __init_subclass__(cls) -> None:
             stacklevel=2,
         )
         return super().__init_subclass__()
+
+
+class DAGNode(DependencyMixin):
+    """
+    A base class for a node in the graph of a workflow -- an Operator or a 
Task Group, either mapped or
+    unmapped.
+    """
+
+    dag: Optional["DAG"] = None
+
+    @property
+    @abstractmethod

Review comment:
       I think this is useless because `DAGNode` (and its parent 
`DependencyMixin`) does not use `ABCMeta`?

##########
File path: airflow/utils/task_group.py
##########
@@ -348,10 +382,35 @@ def build_map(task_group):
         build_map(self)
         return task_group_map
 
-    def get_child_by_label(self, label: str) -> Union["BaseOperator", 
"TaskGroup"]:
+    def get_child_by_label(self, label: str) -> DAGNode:
         """Get a child task/TaskGroup by its label (i.e. task_id/group_id 
without the group_id prefix)"""
         return self.children[self.child_id(label)]
 
+    def map(self, arg: Iterable) -> "MappedTaskGroup":
+        if self.children:
+            raise RuntimeError("Cannot map a TaskGroup that already has 
children")
+        if not self.group_id:
+            raise RuntimeError("Cannot map a TaskGroup before it has a 
group_id")
+        if self._parent_group:
+            self._parent_group._remove(self)
+        tg = MappedTaskGroup(self._group_id)
+        tg.mapped_arg = arg
+        tg.mapped_kwargs = {}
+        tg.partial_kwargs = {}

Review comment:
       Any reason these aren’t in `MappedTaskGroup`’s `__init__`?

##########
File path: airflow/models/taskmixin.py
##########
@@ -88,3 +99,167 @@ def __init_subclass__(cls) -> None:
             stacklevel=2,
         )
         return super().__init_subclass__()
+
+
+class DAGNode(DependencyMixin):
+    """
+    A base class for a node in the graph of a workflow -- an Operator or a 
Task Group, either mapped or
+    unmapped.
+    """
+
+    dag: Optional["DAG"] = None
+
+    @property
+    @abstractmethod
+    def node_id(self) -> str:
+        raise NotImplementedError()
+
+    task_group: Optional["TaskGroup"]
+    """The task_group that contains this node"""
+
+    start_date: Optional[pendulum.DateTime]
+    end_date: Optional[pendulum.DateTime]
+
+    def has_dag(self) -> bool:
+        return self.dag is not None
+
+    @property
+    @abstractmethod
+    def upstream_task_ids(self) -> Set[str]:
+        raise NotImplementedError()
+
+    @property
+    @abstractmethod
+    def downstream_task_ids(self) -> Set[str]:
+        raise NotImplementedError()
+
+    @property
+    def log(self) -> "Logger":
+        raise NotImplementedError()
+
+    @property
+    @abstractmethod
+    def roots(self) -> Sequence["DAGNode"]:
+        raise NotImplementedError()
+
+    @property
+    @abstractmethod
+    def leaves(self) -> Sequence["DAGNode"]:
+        raise NotImplementedError()
+
+    def _set_relatives(
+        self,
+        task_or_task_list: Union[DependencyMixin, Sequence[DependencyMixin]],
+        upstream: bool = False,
+        edge_modifier: Optional["EdgeModifier"] = None,
+    ) -> None:
+        """Sets relatives for the task or task list."""
+        from airflow.models.baseoperator import BaseOperator, MappedOperator
+
+        if not isinstance(task_or_task_list, Sequence):
+            task_or_task_list = [task_or_task_list]
+
+        task_list: List[DAGNode] = []
+        for task_object in task_or_task_list:

Review comment:
       Is Mypy OK with this? I’d make `_set_relatives()` only accept the 
sequence case, and do the coercing outside in the public functions.

##########
File path: airflow/utils/task_group.py
##########
@@ -135,8 +147,9 @@ def __init__(
         # so that we can optimize the number of edges when entire TaskGroups 
depend on each other.
         self.upstream_group_ids: Set[Optional[str]] = set()
         self.downstream_group_ids: Set[Optional[str]] = set()
-        self.upstream_task_ids: Set[Optional[str]] = set()
-        self.downstream_task_ids: Set[Optional[str]] = set()
+        # Since the parent class defines these as read-only properties, we can 
't just do `self.x = ...`
+        self.__dict__['upstream_task_ids'] = set()
+        self.__dict__['downstream_task_ids'] = set()

Review comment:
       Can we not change the parent class definition to be plain class-level 
variable declarations?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to