insomnes commented on code in PR #46728:
URL: https://github.com/apache/airflow/pull/46728#discussion_r1975397441


##########
task_sdk/src/airflow/sdk/execution_time/task_runner.py:
##########
@@ -289,27 +296,46 @@ def xcom_pull(
 
         if task_ids is None:
             # default to the current task if not provided
-            task_ids = self.task_id
+            task_ids = [self.task_id]
         elif isinstance(task_ids, str):
             task_ids = [task_ids]
+
+        map_indexes_iterable: Iterable[int | None] = []
+        # If map_indexes is not provided, default to use the map_index of the 
calling task
         if isinstance(map_indexes, ArgNotSet):
-            map_indexes = self.map_index
-        elif isinstance(map_indexes, Iterable):
-            # TODO: Handle multiple map_indexes or remove support
-            raise NotImplementedError("Multiple map_indexes are not supported 
yet")
+            map_indexes_iterable = [self.map_index]
+        elif isinstance(map_indexes, int) or map_indexes is None:
+            map_indexes_iterable = [map_indexes]
+        else:
+            map_indexes_iterable = map_indexes
 
         log = structlog.get_logger(logger_name="task")
 
         xcoms = []
-        for t in task_ids:
+        # TODO: Execution API only allows working with a single map_index at a 
time
+        # this is inefficient and leads to task_id * map_index requests to the 
API.
+        # And we can't achieve the original behavior of XCom pull with 
multiple tasks
+        # directly now.
+        # Original behavior may be achieved after `LazyXComSequence` is 
finished?
+        #
+        # Original description:
+        #
+        # When pulling one single task (``task_id`` is *None* or a str) without
+        # specifying ``map_indexes``, the return value is inferred from whether
+        # the specified task is mapped. If not, value from the one single task
+        # instance is returned. If the task to pull is mapped, an iterator 
(not a
+        # list) yielding XComs from mapped task instances is returned. In 
either
+        # case, ``default`` (*None* if not specified) is returned if no 
matching
+        # XComs are found.
+        for t_id, m_idx in product(task_ids, map_indexes_iterable):

Review Comment:
   Correct me if I am looking at the wrong API, but the `get_xcom` method here:
   
https://github.com/apache/airflow/blob/main/airflow/api_fastapi/execution_api/routes/xcoms.py#L117
   
   Returns only one XCom result by signature and as I understood through code 
reading:
   ```python
       # The xcom_query allows no map_index to be passed. This endpoint should 
always return just a single item,
       # so we override that query value
       xcom_query = xcom_query.filter(BaseXCom.map_index == map_index)
       # We use `BaseXCom.get_many` to fetch XComs directly from the database, 
bypassing the XCom Backend.
       # This avoids deserialization via the backend (e.g., from a remote 
storage like S3) and instead
       # retrieves the raw serialized value from the database. By not relying 
on `XCom.get_many` or `XCom.get_one`
       # (which automatically deserializes using the backend), we avoid 
potential
       # performance hits from retrieving large data files into the API server.
   result = xcom_query.limit(1).first()
   ```
   
   And `XComResult`, `XComResponse` are single-item models, am I missing 
something? Will it be multiple inside the JSON? My manual tests showed that 
with the current API call non-providing is not a working strategy, at least 
back then. I will recheck.
   
   >  not passing a map index as "return all XComs for mapped tis"
   
   This behavior is available at `get_xcom_entries` from public xcom API, and I 
was targeting it in my initial variant, but after testing I understood that 
different API is used and I cannot rely on this:
   
https://github.com/apache/airflow/blob/main/airflow/api_fastapi/core_api/routes/public/xcom.py#L109
   
   
   Can you guide me here, what kind of behavior is expected from this method? I 
will try to make it work via `LazySequence` if possible.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to