kaxil commented on code in PR #45509:
URL: https://github.com/apache/airflow/pull/45509#discussion_r1908771044


##########
task_sdk/src/airflow/sdk/execution_time/task_runner.py:
##########
@@ -213,40 +213,48 @@ def xcom_pull(
             run_id = self.run_id
 
         if task_ids is None:
+            # default to the current task if not provided
             task_ids = self.task_id
         elif not isinstance(task_ids, str) and isinstance(task_ids, Iterable):
-            # TODO: Handle multiple task_ids or remove support
-            raise NotImplementedError("Multiple task_ids are not supported 
yet")
-
+            # Retain the ordering as per legacy
+            task_ids = list(task_ids)
         if map_indexes is None:
             map_indexes = self.map_index
         elif isinstance(map_indexes, Iterable):
             # TODO: Handle multiple map_indexes or remove support
             raise NotImplementedError("Multiple map_indexes are not supported 
yet")
 
         log = structlog.get_logger(logger_name="task")
-        SUPERVISOR_COMMS.send_request(
-            log=log,
-            msg=GetXCom(
-                key=key,
-                dag_id=dag_id,
-                task_id=task_ids,
-                run_id=run_id,
-                map_index=map_indexes,
-            ),
-        )
-
-        msg = SUPERVISOR_COMMS.get_message()
-        if TYPE_CHECKING:
-            assert isinstance(msg, XComResult)
 
-        if msg.value is not None:
-            from airflow.models.xcom import XCom
+        xcoms = []
+        for t in task_ids:
+            SUPERVISOR_COMMS.send_request(
+                log=log,
+                msg=GetXCom(
+                    key=key,
+                    dag_id=dag_id,
+                    task_id=t,
+                    run_id=run_id,
+                    map_index=map_indexes,
+                ),
+            )
 
-            # TODO: Move XCom serialization & deserialization to Task SDK
-            #   https://github.com/apache/airflow/issues/45231
-            return XCom.deserialize_value(msg)  # type: ignore[arg-type]
-        return default
+            msg = SUPERVISOR_COMMS.get_message()
+            if TYPE_CHECKING:
+                assert isinstance(msg, XComResult)

Review Comment:
   I think `ruff` compains about using `assert`, so @amoghrajesh you will have 
to change this to:
   
   ```
   if isinstance(msg, XComResult):
        raise TypeError(f"Expected XComResult, received: {type(msg)} {msg}")
   ```
   
   Problem with `assert`:
   ```
   task_sdk/src/airflow/sdk/execution_time/task_runner.py:241:9: S101 Use of 
`assert` detected
       |
   239 |         msg = SUPERVISOR_COMMS.get_message()
   240 |         # if TYPE_CHECKING:
   241 |         assert isinstance(msg, XComResult)
       |         ^^^^^^ S101
   242 |
   243 |         if msg.value is not None:
       |
   
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to