Copilot commented on code in PR #59104:
URL: https://github.com/apache/airflow/pull/59104#discussion_r2880077985
##########
airflow-core/src/airflow/models/taskinstance.py:
##########
Review Comment:
In the single-task mapped-XCom branch, `order_by` is passed as
`[XComModel.map_index.asc()]`, but `LazySelectSequence.from_select()` already
applies `.asc()`/`.desc()` to each element in `order_by`. Passing an
already-ordered expression here is redundant and can lead to awkward/invalid
ORDER BY expressions depending on SQLAlchemy compilation. Pass the column
expression (`XComModel.map_index`) instead for consistency with the multi-value
path below.
```suggestion
order_by=[XComModel.map_index],
```
##########
airflow-core/tests/unit/models/test_taskinstance.py:
##########
Review Comment:
The regression described in #59008 is specifically about mapped tasks
producing a different return type when there is exactly one mapped instance.
This test only creates XComs for map_index 0 and 1; it would be stronger (and
closer to the reported bug) to also cover the single-instance case (only
map_index=0) and assert the returned lazy sequence yields the expected values
(e.g. `list(result) == [...]`) in map_index order.
```suggestion
)
# The lazy sequence should yield values in map_index order.
assert list(result) == [2, 4]
# Also cover the single-instance mapped case (only map_index=0).
dag_run_single =
dag_maker.create_dagrun(logical_date=timezone.utcnow())
base_ti_single = dag_run_single.get_task_instance("upstream",
session=session)
base_task_single = dag_maker.dag.get_task("upstream")
ti_single = TaskInstance(
base_task_single,
run_id=dag_run_single.run_id,
map_index=0,
dag_version_id=base_ti_single.dag_version_id,
)
session.add(ti_single)
ti_single.dag_run = dag_run_single
session.flush()
XComModel.set(
key=XCOM_RETURN_KEY,
value=3,
dag_id=dag_run_single.dag_id,
task_id="upstream",
run_id=dag_run_single.run_id,
map_index=0,
session=session,
)
ti_downstream_single =
dag_run_single.get_task_instance("downstream", session=session)
ti_downstream_single.task = dag_maker.dag.task_dict["downstream"]
result_single = ti_downstream_single.xcom_pull(task_ids="upstream",
session=session)
assert isinstance(result_single, LazyXComSelectSequence), (
f"Expected LazyXComSelectSequence for single mapped XCom, got
{type(result_single)}"
)
assert list(result_single) == [3]
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]