jason810496 commented on code in PR #59104:
URL: https://github.com/apache/airflow/pull/59104#discussion_r2638787096


##########
airflow-core/tests/unit/models/test_taskinstance.py:
##########
@@ -1386,6 +1386,100 @@ def test_xcom_push_flag(self, dag_maker):
         ti.run()
         assert ti.xcom_pull(task_ids=task_id) is None
 
+    def test_xcom_pull_unmapped_task(self, dag_maker, session):
+        """
+        Test that xcom_pull from unmapped task returns single deserialized 
value.
+
+        For unmapped tasks with map_index < 0, xcom_pull should return the 
single value,
+        not a LazyXComSelectSequence.
+        """
+        from airflow.models.xcom import XComModel
+
+        with dag_maker(dag_id="test_xcom_unmapped"):
+            upstream = PythonOperator(
+                task_id="unmapped_task",
+                python_callable=lambda: {"key": "value"},
+            )
+            downstream = PythonOperator(
+                task_id="downstream",
+                python_callable=lambda: None,
+            )
+            upstream >> downstream
+
+        dag_run = dag_maker.create_dagrun(logical_date=timezone.utcnow())
+
+        # Get task instances
+        ti_upstream = dag_run.get_task_instance("unmapped_task")
+        ti_downstream = dag_run.get_task_instance("downstream")
+
+        # Set task references
+        ti_upstream.task = dag_maker.dag.task_dict["unmapped_task"]
+        ti_downstream.task = dag_maker.dag.task_dict["downstream"]
+
+        # Push xcom value
+        XComModel.set(
+            key="result",
+            value={"key": "value"},
+            task_id="unmapped_task",
+            dag_id="test_xcom_unmapped",
+            run_id=dag_run.run_id,
+            map_index=-1,
+            session=session,
+        )
+        session.commit()

Review Comment:
   Instead of calling `XComModel.set` ourselves, would it be better to call 
`dag_maker.run_ti` or `ti.run()` to test the expected behavior.
   
   
   Here is the test we could take example from:
   
https://github.com/apache/airflow/blob/d0bd2df6d194a8b923fb55b2f37107642c261b40/airflow-core/tests/unit/models/test_taskinstance.py#L3047-L3048
   
   



##########
airflow-core/tests/unit/models/test_taskinstance.py:
##########
@@ -1386,6 +1386,100 @@ def test_xcom_push_flag(self, dag_maker):
         ti.run()
         assert ti.xcom_pull(task_ids=task_id) is None
 
+    def test_xcom_pull_unmapped_task(self, dag_maker, session):
+        """
+        Test that xcom_pull from unmapped task returns single deserialized 
value.
+
+        For unmapped tasks with map_index < 0, xcom_pull should return the 
single value,
+        not a LazyXComSelectSequence.
+        """
+        from airflow.models.xcom import XComModel

Review Comment:
   The `XComModel` is already imported at top level
   
   ```suggestion
   ```



##########
airflow-core/tests/unit/models/test_taskinstance.py:
##########
@@ -1386,6 +1386,100 @@ def test_xcom_push_flag(self, dag_maker):
         ti.run()
         assert ti.xcom_pull(task_ids=task_id) is None
 
+    def test_xcom_pull_unmapped_task(self, dag_maker, session):
+        """
+        Test that xcom_pull from unmapped task returns single deserialized 
value.
+
+        For unmapped tasks with map_index < 0, xcom_pull should return the 
single value,
+        not a LazyXComSelectSequence.
+        """
+        from airflow.models.xcom import XComModel
+
+        with dag_maker(dag_id="test_xcom_unmapped"):
+            upstream = PythonOperator(
+                task_id="unmapped_task",
+                python_callable=lambda: {"key": "value"},
+            )
+            downstream = PythonOperator(
+                task_id="downstream",
+                python_callable=lambda: None,
+            )
+            upstream >> downstream
+
+        dag_run = dag_maker.create_dagrun(logical_date=timezone.utcnow())
+
+        # Get task instances
+        ti_upstream = dag_run.get_task_instance("unmapped_task")
+        ti_downstream = dag_run.get_task_instance("downstream")
+
+        # Set task references
+        ti_upstream.task = dag_maker.dag.task_dict["unmapped_task"]
+        ti_downstream.task = dag_maker.dag.task_dict["downstream"]
+
+        # Push xcom value
+        XComModel.set(
+            key="result",
+            value={"key": "value"},
+            task_id="unmapped_task",
+            dag_id="test_xcom_unmapped",
+            run_id=dag_run.run_id,
+            map_index=-1,
+            session=session,
+        )
+        session.commit()
+
+        # Pull xcom - should return single dict value, not 
LazyXComSelectSequence
+        result = ti_downstream.xcom_pull(task_ids="unmapped_task", 
key="result", session=session)
+        assert isinstance(result, dict), f"Expected dict for unmapped task, 
got {type(result)}"
+        assert result == {"key": "value"}
+
+    def test_xcom_pull_mapped_task(self, dag_maker, session):

Review Comment:
   Same suggestion for the second test method.



##########
airflow-core/tests/unit/models/test_taskinstance.py:
##########
@@ -1386,6 +1386,100 @@ def test_xcom_push_flag(self, dag_maker):
         ti.run()
         assert ti.xcom_pull(task_ids=task_id) is None
 
+    def test_xcom_pull_unmapped_task(self, dag_maker, session):

Review Comment:
   Would `class TestMappedTaskInstanceReceiveValue:` be a better place to put 
the test case?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to