kaxil commented on code in PR #45509:
URL: https://github.com/apache/airflow/pull/45509#discussion_r1908774093
##########
task_sdk/src/airflow/sdk/execution_time/task_runner.py:
##########
@@ -213,40 +213,48 @@ def xcom_pull(
run_id = self.run_id
if task_ids is None:
+ # default to the current task if not provided
task_ids = self.task_id
elif not isinstance(task_ids, str) and isinstance(task_ids, Iterable):
- # TODO: Handle multiple task_ids or remove support
- raise NotImplementedError("Multiple task_ids are not supported
yet")
-
+ # Retain the ordering as per legacy
+ task_ids = list(task_ids)
if map_indexes is None:
map_indexes = self.map_index
elif isinstance(map_indexes, Iterable):
# TODO: Handle multiple map_indexes or remove support
raise NotImplementedError("Multiple map_indexes are not supported
yet")
log = structlog.get_logger(logger_name="task")
- SUPERVISOR_COMMS.send_request(
- log=log,
- msg=GetXCom(
- key=key,
- dag_id=dag_id,
- task_id=task_ids,
- run_id=run_id,
- map_index=map_indexes,
- ),
- )
-
- msg = SUPERVISOR_COMMS.get_message()
- if TYPE_CHECKING:
- assert isinstance(msg, XComResult)
- if msg.value is not None:
- from airflow.models.xcom import XCom
+ xcoms = []
+ for t in task_ids:
Review Comment:
Could you add it please
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]