Copilot commented on code in PR #59104:
URL: https://github.com/apache/airflow/pull/59104#discussion_r2880077985


##########
airflow-core/src/airflow/models/taskinstance.py:
##########


Review Comment:
   In the single-task mapped-XCom branch, `order_by` is passed as 
`[XComModel.map_index.asc()]`, but `LazySelectSequence.from_select()` already 
applies `.asc()`/`.desc()` to each element in `order_by`. Passing an 
already-ordered expression here is redundant and can lead to awkward/invalid 
ORDER BY expressions depending on SQLAlchemy compilation. Pass the column 
expression (`XComModel.map_index`) instead for consistency with the multi-value 
path below.
   ```suggestion
                   order_by=[XComModel.map_index],
   ```



##########
airflow-core/tests/unit/models/test_taskinstance.py:
##########


Review Comment:
   The regression described in #59008 is specifically about mapped tasks 
producing a different return type when there is exactly one mapped instance. 
This test only creates XComs for map_index 0 and 1; it would be stronger (and 
closer to the reported bug) to also cover the single-instance case (only 
map_index=0) and assert the returned lazy sequence yields the expected values 
(e.g. `list(result) == [...]`) in map_index order.
   ```suggestion
           )
           # The lazy sequence should yield values in map_index order.
           assert list(result) == [2, 4]
   
           # Also cover the single-instance mapped case (only map_index=0).
           dag_run_single = 
dag_maker.create_dagrun(logical_date=timezone.utcnow())
   
           base_ti_single = dag_run_single.get_task_instance("upstream", 
session=session)
           base_task_single = dag_maker.dag.get_task("upstream")
           ti_single = TaskInstance(
               base_task_single,
               run_id=dag_run_single.run_id,
               map_index=0,
               dag_version_id=base_ti_single.dag_version_id,
           )
           session.add(ti_single)
           ti_single.dag_run = dag_run_single
           session.flush()
   
           XComModel.set(
               key=XCOM_RETURN_KEY,
               value=3,
               dag_id=dag_run_single.dag_id,
               task_id="upstream",
               run_id=dag_run_single.run_id,
               map_index=0,
               session=session,
           )
   
           ti_downstream_single = 
dag_run_single.get_task_instance("downstream", session=session)
           ti_downstream_single.task = dag_maker.dag.task_dict["downstream"]
   
           result_single = ti_downstream_single.xcom_pull(task_ids="upstream", 
session=session)
           assert isinstance(result_single, LazyXComSelectSequence), (
               f"Expected LazyXComSelectSequence for single mapped XCom, got 
{type(result_single)}"
           )
           assert list(result_single) == [3]
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to