uranusjr commented on code in PR #40172:
URL: https://github.com/apache/airflow/pull/40172#discussion_r1642862399


##########
airflow/models/xcom_arg.py:
##########
@@ -606,8 +615,78 @@ def resolve(self, context: Context, session: Session = 
NEW_SESSION) -> Any:
         return _ZipResult(values, fillvalue=self.fillvalue)
 
 
+class _ConcatResult(Sequence):
+    def __init__(self, values: Sequence[Sequence | dict]) -> None:
+        self.values = values
+
+    def __getitem__(self, index: Any) -> Any:
+        i = index
+        for value in self.values:
+            if i >= (curlen := len(value)):
+                i -= curlen
+            elif isinstance(value, Sequence):
+                return value[i]
+            else:
+                return next(itertools.islice(iter(value), i, None))
+        raise IndexError(index)
+
+    def __len__(self) -> int:
+        return sum(len(v) for v in self.values)
+
+
+class ConcatXComArg(XComArg):
+    """Concatenating multiple XCom references into one.
+
+    This is done by calling ``concat()`` on an XComArg to combine it with
+    others. The effect is similar to Python's :func:`itertools.chain`, but the
+    return value also supports index access.
+    """
+
+    def __init__(self, args: Sequence[XComArg]) -> None:
+        if not args:
+            raise ValueError("At least one input is required")
+        self.args = args
+
+    def __repr__(self) -> str:
+        args_iter = iter(self.args)
+        first = repr(next(args_iter))
+        rest = ", ".join(repr(arg) for arg in args_iter)
+        return f"{first}.concat({rest})"
+
+    def _serialize(self) -> dict[str, Any]:
+        return {"args": [serialize_xcom_arg(arg) for arg in self.args]}
+
+    @classmethod
+    def _deserialize(cls, data: dict[str, Any], dag: DAG) -> XComArg:
+        return cls([deserialize_xcom_arg(arg, dag) for arg in data["args"]])
+
+    def iter_references(self) -> Iterator[tuple[Operator, str]]:
+        for arg in self.args:
+            yield from arg.iter_references()
+
+    def concat(self, *others: XComArg) -> ConcatXComArg:
+        # Flattern foo.concat(x).concat(y) into one call.

Review Comment:
   The functionality is exactly the same from the user’s perspective if this 
isn’t done. Calling out such optimisation seems awkward to me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to