uranusjr commented on code in PR #25176:
URL: https://github.com/apache/airflow/pull/25176#discussion_r932049350


##########
airflow/models/xcom_arg.py:
##########
@@ -379,13 +388,96 @@ def get_task_map_length(self, run_id: str, *, session: 
"Session") -> Optional[in
     @provide_session
     def resolve(self, context: Context, session: "Session" = NEW_SESSION) -> 
Any:
         value = self.arg.resolve(context, session=session)
-        assert isinstance(value, (Sequence, dict))  # Validation was done when 
XCom was pushed.
+        if not isinstance(value, (Sequence, dict)):
+            raise ValueError(f"XCom map expects sequence or dict, not 
{type(value).__name__}")
         return _MapResult(value, self.callables)
 
 
+class _ZipResult(Sequence):
+    def __init__(self, values: Sequence[Union[Sequence, dict]], *, fillvalue: 
Any = NOTSET) -> None:
+        self.values = values
+        self.fillvalue = fillvalue
+
+    @staticmethod
+    def _get_or_fill(container: Union[Sequence, dict], index: Any, fillvalue: 
Any) -> Any:
+        try:
+            return container[index]
+        except (IndexError, KeyError):
+            return fillvalue
+
+    def __getitem__(self, index: Any) -> Any:
+        if index >= len(self):
+            raise IndexError(index)
+        return tuple(self._get_or_fill(value, index, self.fillvalue) for value 
in self.values)
+
+    def __len__(self) -> int:
+        lengths = (len(v) for v in self.values)
+        if self.fillvalue is NOTSET:
+            return min(lengths)
+        return max(lengths)
+
+
+class ZipXComArg(XComArg):
+    """An XCom reference with ``zip()`` applied.
+
+    This is constructed from multiple XComArg instances, and presents an
+    iterable that "zips" them together like the built-in ``zip()`` (and
+    ``itertools.zip_longest()`` if ``fillvalue`` is provided).
+    """
+
+    def __init__(self, args: Sequence[XComArg], *, fillvalue: Any = NOTSET) -> 
None:
+        if not args:
+            raise ValueError("At least one input is required")
+        self.args = args
+        self.fillvalue = fillvalue
+
+    def __repr__(self) -> str:
+        args_iter = iter(self.args)
+        first = repr(next(args_iter))
+        rest = ", ".join(repr(arg) for arg in args_iter)
+        if self.fillvalue is NOTSET:
+            return f"{first}.zip({rest})"
+        return f"{first}.zip({rest}, fillvalue={self.fillvalue!r})"
+
+    def _serialize(self) -> Dict[str, Any]:
+        args = [serialize_xcom_arg(arg) for arg in self.args]
+        if self.fillvalue is NOTSET:
+            return {"args": args}
+        return {"args": args, "fillvalue": self.fillvalue}
+
+    @classmethod
+    def _deserialize(cls, data: Dict[str, Any], dag: "DAG") -> XComArg:
+        return cls(
+            [deserialize_xcom_arg(arg, dag) for arg in data["args"]],
+            fillvalue=data.get("fillvalue", NOTSET),
+        )
+
+    def iter_references(self) -> Iterator[Tuple["Operator", str]]:
+        for arg in self.args:
+            yield from arg.iter_references()
+
+    def get_task_map_length(self, run_id: str, *, session: "Session") -> 
Optional[int]:
+        all_lengths = (arg.get_task_map_length(run_id, session=session) for 
arg in self.args)
+        ready_lengths = [length for length in all_lengths if length is not 
None]
+        if len(ready_lengths) != len(self.args):
+            return None  # If any of the referenced XComs is not ready, we are 
not ready either.

Review Comment:
   Yes #24338 will require changing this, but I want to make that change in the 
PR fixing the issue.
   
   Returning None here means “I don’t know how many tasks the downstream needs 
to be expanded into”, so `fillvalue` is not relevant here. Until all referenced 
XComs are available (or we know it won’t be, to address #24338), we can’t know 
how long the zipped result is (because it can still be either shorter or 
longer), and thus cannot decide how many tasks are needed. So I think this is 
the correct logic (although yes we need to change the `if` clause when fixing 
#24338).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to