uranusjr commented on code in PR #27876:
URL: https://github.com/apache/airflow/pull/27876#discussion_r1031050752


##########
airflow/utils/task_group.py:
##########
@@ -507,6 +508,15 @@ class MappedTaskGroup(TaskGroup):
     def __init__(self, *, expand_input: ExpandInput, **kwargs: Any) -> None:
         super().__init__(**kwargs)
         self._expand_input = expand_input
+        for op, _ in expand_input.iter_references():
+            self.set_upstream(op)
+
+    def iter_mapped_dependencies(self) -> Iterator[Operator]:
+        """Upstream dependencies that provide XComs used by this mapped task 
group."""
+        from airflow.models.xcom_arg import XComArg
+
+        for op, _ in XComArg.iter_xcom_references(self._expand_input):
+            yield op

Review Comment:
   Implemented to match MappedOperator, so `iter_mapped_dependants` can stay 
unchanged (except the type hints).



##########
airflow/utils/task_group.py:
##########
@@ -507,6 +508,15 @@ class MappedTaskGroup(TaskGroup):
     def __init__(self, *, expand_input: ExpandInput, **kwargs: Any) -> None:
         super().__init__(**kwargs)
         self._expand_input = expand_input
+        for op, _ in expand_input.iter_references():
+            self.set_upstream(op)

Review Comment:
   This is the key fix.



##########
airflow/jobs/backfill_job.py:
##########
@@ -271,10 +271,20 @@ def _manage_executor_state(
                 self.log.error(msg)
                 ti.handle_failure(error=msg)
                 continue
+
+            def _iter_task_needing_expansion() -> Iterator[AbstractOperator]:
+                from airflow.models.mappedoperator import AbstractOperator
+
+                for node in self.dag.get_task(ti.task_id, 
include_subdags=True).iter_mapped_dependants():
+                    if isinstance(node, AbstractOperator):
+                        yield node
+                    else:  # A (mapped) task group. All its children need 
expansion.
+                        yield from node.iter_tasks()
+
             if ti.state not in self.STATES_COUNT_AS_RUNNING:
                 # Don't use ti.task; if this task is mapped, that attribute
                 # would hold the unmapped task. We need to original task here.
-                for node in self.dag.get_task(ti.task_id, 
include_subdags=True).iter_mapped_dependants():
+                for node in _iter_task_needing_expansion():

Review Comment:
   Bug caught by cascaded type hint updates.



##########
airflow/models/taskinstance.py:
##########
@@ -2226,18 +2239,14 @@ def _record_task_map_for_downstreams(self, task: 
Operator, value: Any, *, sessio
             return
         # TODO: We don't push TaskMap for mapped task instances because it's 
not
         # currently possible for a downstream to depend on one individual 
mapped
-        # task instance. This will change when we implement task group mapping,
-        # and we'll need to further analyze the mapped task case.
+        # task instance. This will change when we implement task mapping inside
+        # a mapped task group, and we'll need to further analyze the case.

Review Comment:
   This comment is fixed to reflect the up-to-date exact reason.



##########
airflow/models/taskinstance.py:
##########
@@ -2226,18 +2239,14 @@ def _record_task_map_for_downstreams(self, task: 
Operator, value: Any, *, sessio
             return
         # TODO: We don't push TaskMap for mapped task instances because it's 
not
         # currently possible for a downstream to depend on one individual 
mapped
-        # task instance. This will change when we implement task group mapping,
-        # and we'll need to further analyze the mapped task case.
+        # task instance. This will change when we implement task mapping inside
+        # a mapped task group, and we'll need to further analyze the case.
         if task.is_mapped:
             return
         if value is None:
             raise XComForMappingNotPushed()
-        if not isinstance(value, (collections.abc.Sequence, dict)):
-            raise UnmappableXComTypePushed(value)
-        if isinstance(value, (bytes, str)):
+        if not _is_mappable_value(value):
             raise UnmappableXComTypePushed(value)
-        if TYPE_CHECKING:  # The isinstance() checks above guard this.
-            assert isinstance(value, collections.abc.Collection)

Review Comment:
   Just some refactorings to use PEP 647 TypeGuard.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to