ashb commented on code in PR #24101:
URL: https://github.com/apache/airflow/pull/24101#discussion_r891174401
##########
airflow/models/taskinstance.py:
##########
@@ -2272,19 +2271,20 @@ def _record_task_map_for_downstreams(self, task:
"Operator", value: Any, *, sess
# currently possible for a downstream to depend on one individual
mapped
# task instance, only a task as a whole. This will change in AIP-42
# Phase 2, and we'll need to further analyze the mapped task case.
- if next(task.iter_mapped_dependants(), None) is None:
- return
- if value is None:
- raise XComForMappingNotPushed()
if task.is_mapped:
+ # But we still need to check the mapped task did push something for
+ # its downstream tasks...
+ if next(task.iter_mapped_dependants(), None) is not None and value
is None:
+ raise XComForMappingNotPushed()
+ return
+
+ it1, it2 = itertools.tee(task.iter_mapped_dependants())
+ if next(it1, None) is None: # No children need to expand, don't
bother recording.
return
- if not isinstance(value, collections.abc.Collection) or
isinstance(value, (bytes, str)):
- raise UnmappableXComTypePushed(value)
- task_map = TaskMap.from_task_instance_xcom(self, value)
- max_map_length = conf.getint("core", "max_map_length", fallback=1024)
- if task_map.length > max_map_length:
- raise UnmappableXComLengthPushed(value, max_map_length)
- session.merge(task_map)
+ session.merge(TaskMap.from_task_instance_xcom(self, value, item=""))
+ if isinstance(value, dict) and any(node.should_unpack_mapped_kwargs()
for node in it2):
+ for task_map in TaskMap.from_unpacking_task_instance_xcom(self,
value):
+ session.merge(task_map)
Review Comment:
Why do we create both `item=""` and item set to something here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]