jason810496 commented on code in PR #59104:
URL: https://github.com/apache/airflow/pull/59104#discussion_r2638787096
##########
airflow-core/tests/unit/models/test_taskinstance.py:
##########
@@ -1386,6 +1386,100 @@ def test_xcom_push_flag(self, dag_maker):
ti.run()
assert ti.xcom_pull(task_ids=task_id) is None
+ def test_xcom_pull_unmapped_task(self, dag_maker, session):
+ """
+ Test that xcom_pull from unmapped task returns single deserialized
value.
+
+ For unmapped tasks with map_index < 0, xcom_pull should return the
single value,
+ not a LazyXComSelectSequence.
+ """
+ from airflow.models.xcom import XComModel
+
+ with dag_maker(dag_id="test_xcom_unmapped"):
+ upstream = PythonOperator(
+ task_id="unmapped_task",
+ python_callable=lambda: {"key": "value"},
+ )
+ downstream = PythonOperator(
+ task_id="downstream",
+ python_callable=lambda: None,
+ )
+ upstream >> downstream
+
+ dag_run = dag_maker.create_dagrun(logical_date=timezone.utcnow())
+
+ # Get task instances
+ ti_upstream = dag_run.get_task_instance("unmapped_task")
+ ti_downstream = dag_run.get_task_instance("downstream")
+
+ # Set task references
+ ti_upstream.task = dag_maker.dag.task_dict["unmapped_task"]
+ ti_downstream.task = dag_maker.dag.task_dict["downstream"]
+
+ # Push xcom value
+ XComModel.set(
+ key="result",
+ value={"key": "value"},
+ task_id="unmapped_task",
+ dag_id="test_xcom_unmapped",
+ run_id=dag_run.run_id,
+ map_index=-1,
+ session=session,
+ )
+ session.commit()
Review Comment:
Instead of calling `XComModel.set` ourselves, would it be better to call
`dag_maker.run_ti` or `ti.run()` to test the expected behavior.
Here is the test we could take example from:
https://github.com/apache/airflow/blob/d0bd2df6d194a8b923fb55b2f37107642c261b40/airflow-core/tests/unit/models/test_taskinstance.py#L3047-L3048
##########
airflow-core/tests/unit/models/test_taskinstance.py:
##########
@@ -1386,6 +1386,100 @@ def test_xcom_push_flag(self, dag_maker):
ti.run()
assert ti.xcom_pull(task_ids=task_id) is None
+ def test_xcom_pull_unmapped_task(self, dag_maker, session):
+ """
+ Test that xcom_pull from unmapped task returns single deserialized
value.
+
+ For unmapped tasks with map_index < 0, xcom_pull should return the
single value,
+ not a LazyXComSelectSequence.
+ """
+ from airflow.models.xcom import XComModel
Review Comment:
The `XComModel` is already imported at top level
```suggestion
```
##########
airflow-core/tests/unit/models/test_taskinstance.py:
##########
@@ -1386,6 +1386,100 @@ def test_xcom_push_flag(self, dag_maker):
ti.run()
assert ti.xcom_pull(task_ids=task_id) is None
+ def test_xcom_pull_unmapped_task(self, dag_maker, session):
+ """
+ Test that xcom_pull from unmapped task returns single deserialized
value.
+
+ For unmapped tasks with map_index < 0, xcom_pull should return the
single value,
+ not a LazyXComSelectSequence.
+ """
+ from airflow.models.xcom import XComModel
+
+ with dag_maker(dag_id="test_xcom_unmapped"):
+ upstream = PythonOperator(
+ task_id="unmapped_task",
+ python_callable=lambda: {"key": "value"},
+ )
+ downstream = PythonOperator(
+ task_id="downstream",
+ python_callable=lambda: None,
+ )
+ upstream >> downstream
+
+ dag_run = dag_maker.create_dagrun(logical_date=timezone.utcnow())
+
+ # Get task instances
+ ti_upstream = dag_run.get_task_instance("unmapped_task")
+ ti_downstream = dag_run.get_task_instance("downstream")
+
+ # Set task references
+ ti_upstream.task = dag_maker.dag.task_dict["unmapped_task"]
+ ti_downstream.task = dag_maker.dag.task_dict["downstream"]
+
+ # Push xcom value
+ XComModel.set(
+ key="result",
+ value={"key": "value"},
+ task_id="unmapped_task",
+ dag_id="test_xcom_unmapped",
+ run_id=dag_run.run_id,
+ map_index=-1,
+ session=session,
+ )
+ session.commit()
+
+ # Pull xcom - should return single dict value, not
LazyXComSelectSequence
+ result = ti_downstream.xcom_pull(task_ids="unmapped_task",
key="result", session=session)
+ assert isinstance(result, dict), f"Expected dict for unmapped task,
got {type(result)}"
+ assert result == {"key": "value"}
+
+ def test_xcom_pull_mapped_task(self, dag_maker, session):
Review Comment:
Same suggestion for the second test method.
##########
airflow-core/tests/unit/models/test_taskinstance.py:
##########
@@ -1386,6 +1386,100 @@ def test_xcom_push_flag(self, dag_maker):
ti.run()
assert ti.xcom_pull(task_ids=task_id) is None
+ def test_xcom_pull_unmapped_task(self, dag_maker, session):
Review Comment:
Would `class TestMappedTaskInstanceReceiveValue:` be a better place to put
the test case?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]