uranusjr commented on code in PR #24101:
URL: https://github.com/apache/airflow/pull/24101#discussion_r891981738
##########
airflow/models/taskinstance.py:
##########
@@ -2272,19 +2271,20 @@ def _record_task_map_for_downstreams(self, task:
"Operator", value: Any, *, sess
# currently possible for a downstream to depend on one individual
mapped
# task instance, only a task as a whole. This will change in AIP-42
# Phase 2, and we'll need to further analyze the mapped task case.
- if next(task.iter_mapped_dependants(), None) is None:
- return
- if value is None:
- raise XComForMappingNotPushed()
if task.is_mapped:
+ # But we still need to check the mapped task did push something for
+ # its downstream tasks...
+ if next(task.iter_mapped_dependants(), None) is not None and value
is None:
+ raise XComForMappingNotPushed()
+ return
+
+ it1, it2 = itertools.tee(task.iter_mapped_dependants())
+ if next(it1, None) is None: # No children need to expand, don't
bother recording.
return
- if not isinstance(value, collections.abc.Collection) or
isinstance(value, (bytes, str)):
- raise UnmappableXComTypePushed(value)
- task_map = TaskMap.from_task_instance_xcom(self, value)
- max_map_length = conf.getint("core", "max_map_length", fallback=1024)
- if task_map.length > max_map_length:
- raise UnmappableXComLengthPushed(value, max_map_length)
- session.merge(task_map)
+ session.merge(TaskMap.from_task_instance_xcom(self, value, item=""))
+ if isinstance(value, dict) and any(node.should_unpack_mapped_kwargs()
for node in it2):
+ for task_map in TaskMap.from_unpacking_task_instance_xcom(self,
value):
+ session.merge(task_map)
Review Comment:
We need the item set for the “drill-down” kind of task expansion (i.e.
`expand_kwargs(xcom_arg)`, where we need to iterate against the content of
`xcom_arg`). Technically we can avoid the `item=""` case for this particular
case, but we still need it for something like
```python
t1 = ... # XComArg to {"a": [1, 2]}
t2.expand_kwargs(t1) # This expands against [1, 2].
t3.expand(x=t1) # This expands against ["a"].
```
but I figured it’s easier to simply save oen additional row than doing
another DAG traversal to figure out if there is a `t3`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]