josh-fell commented on code in PR #40172:
URL: https://github.com/apache/airflow/pull/40172#discussion_r1642835149


##########
airflow/models/xcom_arg.py:
##########
@@ -606,8 +615,78 @@ def resolve(self, context: Context, session: Session = 
NEW_SESSION) -> Any:
         return _ZipResult(values, fillvalue=self.fillvalue)
 
 
+class _ConcatResult(Sequence):
+    def __init__(self, values: Sequence[Sequence | dict]) -> None:
+        self.values = values
+
+    def __getitem__(self, index: Any) -> Any:
+        i = index
+        for value in self.values:
+            if i >= (curlen := len(value)):
+                i -= curlen
+            elif isinstance(value, Sequence):
+                return value[i]
+            else:
+                return next(itertools.islice(iter(value), i, None))
+        raise IndexError(index)
+
+    def __len__(self) -> int:
+        return sum(len(v) for v in self.values)
+
+
+class ConcatXComArg(XComArg):
+    """Concatenating multiple XCom references into one.
+
+    This is done by calling ``concat()`` on an XComArg to combine it with
+    others. The effect is similar to Python's :func:`itertools.chain`, but the
+    return value also supports index access.
+    """
+
+    def __init__(self, args: Sequence[XComArg]) -> None:
+        if not args:
+            raise ValueError("At least one input is required")
+        self.args = args
+
+    def __repr__(self) -> str:
+        args_iter = iter(self.args)
+        first = repr(next(args_iter))
+        rest = ", ".join(repr(arg) for arg in args_iter)
+        return f"{first}.concat({rest})"
+
+    def _serialize(self) -> dict[str, Any]:
+        return {"args": [serialize_xcom_arg(arg) for arg in self.args]}
+
+    @classmethod
+    def _deserialize(cls, data: dict[str, Any], dag: DAG) -> XComArg:
+        return cls([deserialize_xcom_arg(arg, dag) for arg in data["args"]])
+
+    def iter_references(self) -> Iterator[tuple[Operator, str]]:
+        for arg in self.args:
+            yield from arg.iter_references()
+
+    def concat(self, *others: XComArg) -> ConcatXComArg:
+        # Flattern foo.concat(x).concat(y) into one call.

Review Comment:
   ```suggestion
           # Flatten foo.concat(x).concat(y) into one call.
   ```
   Small typo here.
   
   Relatedly, would it also be useful to explicitly call out this functionality 
in the documentation too?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to