uranusjr commented on a change in pull request #20286:
URL: https://github.com/apache/airflow/pull/20286#discussion_r783657603



##########
File path: airflow/models/baseoperator.py
##########
@@ -1632,6 +1632,33 @@ def defer(
     def map(self, **kwargs) -> "MappedOperator":
         return MappedOperator.from_operator(self, kwargs)
 
+    def has_mapped_dependants(self) -> bool:
+        """Whether any downstream dependencies depend on this task for 
mapping."""
+        from airflow.utils.task_group import MappedTaskGroup, TaskGroup
+
+        if not self.has_dag():
+            return False
+
+        def _walk_group(group: TaskGroup) -> Iterable[Tuple[str, DAGNode]]:
+            """Recursively walk children in a task group.
+
+            This yields all direct children (including both tasks and task
+            groups), and all children of any task groups.
+            """
+            for key, child in group.children.items():
+                yield key, child
+                if isinstance(child, TaskGroup):
+                    yield from _walk_group(child)
+
+        for key, child in _walk_group(self.dag.task_group):
+            if key == self.task_id:
+                continue
+            if not isinstance(child, (MappedOperator, MappedTaskGroup)):
+                continue
+            if self.task_id in child.upstream_task_ids:
+                return True
+        return False

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to