amoghrajesh commented on code in PR #46728:
URL: https://github.com/apache/airflow/pull/46728#discussion_r1992853742
##########
task-sdk/src/airflow/sdk/execution_time/task_runner.py:
##########
@@ -275,12 +276,17 @@ def xcom_pull(
If *None* (default), the run_id of the calling task is used.
When pulling one single task (``task_id`` is *None* or a str) without
- specifying ``map_indexes``, the return value is inferred from whether
- the specified task is mapped. If not, value from the one single task
- instance is returned. If the task to pull is mapped, an iterator (not a
- list) yielding XComs from mapped task instances is returned. In either
- case, ``default`` (*None* if not specified) is returned if no matching
- XComs are found.
+ specifying ``map_indexes``, the return value is a single XCom entry
+ (map_indexes is set to map_index of the calling task instance).
+
+ When pulling task is mapped the specified ``map_index`` is used, so by
default
+ pulling on mapped task will result in no matching XComs if the task
instance
+ of the method call is not mapped. Otherwise the map_index of the
calling task
Review Comment:
```suggestion
of the method call is not mapped. Otherwise, the map_index of the
calling task
```
##########
task-sdk/src/airflow/sdk/execution_time/task_runner.py:
##########
@@ -296,24 +302,48 @@ def xcom_pull(
task_ids = [self.task_id]
elif isinstance(task_ids, str):
task_ids = [task_ids]
+
+ map_indexes_iterable: Iterable[int | None] = []
+ # If map_indexes is not provided, default to use the map_index of the
calling task
if isinstance(map_indexes, ArgNotSet):
- map_indexes = self.map_index
+ map_indexes_iterable = [self.map_index]
+ elif isinstance(map_indexes, int) or map_indexes is None:
+ map_indexes_iterable = [map_indexes]
elif isinstance(map_indexes, Iterable):
- # TODO: Handle multiple map_indexes or remove support
- raise NotImplementedError("Multiple map_indexes are not supported
yet")
+ map_indexes_iterable = map_indexes
+ else:
+ raise TypeError(
+ "map_indexes can be omitted or must be an int, an iterable of
ints, "
+ f"or None, got {type(map_indexes)}"
+ )
log = structlog.get_logger(logger_name="task")
xcoms = []
- for t in task_ids:
+ # TODO: Execution API only allows working with a single map_index at a
time
+ # this is inefficient and leads to task_id * map_index requests to the
API.
+ # And we can't achieve the original behavior of XCom pull with
multiple tasks
+ # directly now.
+ # Original behavior may be achieved after `LazyXComSequence` is
finished?
+ #
+ # Original description:
+ #
+ # When pulling one single task (``task_id`` is *None* or a str) without
+ # specifying ``map_indexes``, the return value is inferred from whether
+ # the specified task is mapped. If not, value from the one single task
+ # instance is returned. If the task to pull is mapped, an iterator
(not a
+ # list) yielding XComs from mapped task instances is returned. In
either
+ # case, ``default`` (*None* if not specified) is returned if no
matching
+ # XComs are found.
Review Comment:
This is meant to go?
##########
task-sdk/src/airflow/sdk/execution_time/task_runner.py:
##########
@@ -296,24 +302,48 @@ def xcom_pull(
task_ids = [self.task_id]
elif isinstance(task_ids, str):
task_ids = [task_ids]
+
+ map_indexes_iterable: Iterable[int | None] = []
+ # If map_indexes is not provided, default to use the map_index of the
calling task
if isinstance(map_indexes, ArgNotSet):
- map_indexes = self.map_index
+ map_indexes_iterable = [self.map_index]
+ elif isinstance(map_indexes, int) or map_indexes is None:
+ map_indexes_iterable = [map_indexes]
elif isinstance(map_indexes, Iterable):
- # TODO: Handle multiple map_indexes or remove support
- raise NotImplementedError("Multiple map_indexes are not supported
yet")
+ map_indexes_iterable = map_indexes
+ else:
+ raise TypeError(
+ "map_indexes can be omitted or must be an int, an iterable of
ints, "
+ f"or None, got {type(map_indexes)}"
+ )
Review Comment:
Yeah with the right indent
##########
task-sdk/src/airflow/sdk/execution_time/task_runner.py:
##########
@@ -296,24 +302,48 @@ def xcom_pull(
task_ids = [self.task_id]
elif isinstance(task_ids, str):
task_ids = [task_ids]
+
+ map_indexes_iterable: Iterable[int | None] = []
+ # If map_indexes is not provided, default to use the map_index of the
calling task
if isinstance(map_indexes, ArgNotSet):
- map_indexes = self.map_index
+ map_indexes_iterable = [self.map_index]
+ elif isinstance(map_indexes, int) or map_indexes is None:
+ map_indexes_iterable = [map_indexes]
elif isinstance(map_indexes, Iterable):
- # TODO: Handle multiple map_indexes or remove support
- raise NotImplementedError("Multiple map_indexes are not supported
yet")
+ map_indexes_iterable = map_indexes
+ else:
+ raise TypeError(
+ "map_indexes can be omitted or must be an int, an iterable of
ints, "
+ f"or None, got {type(map_indexes)}"
+ )
Review Comment:
nit:
```suggestion
raise TypeError(
f"Invalid type for map_indexes: expected int, iterable of ints, or None,
got {type(map_indexes)}"
)
```
##########
task-sdk/src/airflow/sdk/execution_time/task_runner.py:
##########
@@ -296,24 +302,48 @@ def xcom_pull(
task_ids = [self.task_id]
elif isinstance(task_ids, str):
task_ids = [task_ids]
+
+ map_indexes_iterable: Iterable[int | None] = []
+ # If map_indexes is not provided, default to use the map_index of the
calling task
if isinstance(map_indexes, ArgNotSet):
- map_indexes = self.map_index
+ map_indexes_iterable = [self.map_index]
+ elif isinstance(map_indexes, int) or map_indexes is None:
+ map_indexes_iterable = [map_indexes]
elif isinstance(map_indexes, Iterable):
- # TODO: Handle multiple map_indexes or remove support
- raise NotImplementedError("Multiple map_indexes are not supported
yet")
+ map_indexes_iterable = map_indexes
+ else:
+ raise TypeError(
+ "map_indexes can be omitted or must be an int, an iterable of
ints, "
+ f"or None, got {type(map_indexes)}"
+ )
log = structlog.get_logger(logger_name="task")
xcoms = []
- for t in task_ids:
+ # TODO: Execution API only allows working with a single map_index at a
time
Review Comment:
```suggestion
# TODO: AIP 72 Execution API only allows working with a single
map_index at a time
```
We are adding the AIP 72 as we have done it for multiple places in the repo,
easier to grep at a later point
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]