This is an automated email from the ASF dual-hosted git repository.

uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new dedbeab7eca Expanding task sdk integration tests to cover critical 
xcom operations (#57797)
dedbeab7eca is described below

commit dedbeab7eca59fc5233671956d8a3d928b6d787b
Author: Amogh Desai <[email protected]>
AuthorDate: Thu Nov 6 09:41:47 2025 +0530

    Expanding task sdk integration tests to cover critical xcom operations 
(#57797)
---
 task-sdk-tests/dags/test_dag.py                    |   9 +-
 .../tests/task_sdk_tests/test_xcom_operations.py   | 160 +++++++++++++++++++++
 2 files changed, 168 insertions(+), 1 deletion(-)

diff --git a/task-sdk-tests/dags/test_dag.py b/task-sdk-tests/dags/test_dag.py
index c8f19597ba7..0d69d551de6 100644
--- a/task-sdk-tests/dags/test_dag.py
+++ b/task-sdk-tests/dags/test_dag.py
@@ -29,6 +29,12 @@ def get_task_instance_id(ti=None):
     return str(ti.id)
 
 
+@task(dag=dag)
+def return_tuple_task(ti=None):
+    """Task that returns a tuple for testing XCom 
serialization/deserialization"""
+    return 1, "test_value"
+
+
 @task(dag=dag)
 def long_running_task(ti=None):
     """Long-running task that sleeps for 5 minutes to allow testing"""
@@ -42,6 +48,7 @@ def long_running_task(ti=None):
 
 
 get_ti_id = get_task_instance_id()
+tuple_task = return_tuple_task()
 long_task = long_running_task()
 
-get_ti_id >> long_task
+get_ti_id >> tuple_task >> long_task
diff --git a/task-sdk-tests/tests/task_sdk_tests/test_xcom_operations.py 
b/task-sdk-tests/tests/task_sdk_tests/test_xcom_operations.py
new file mode 100644
index 00000000000..75692835a70
--- /dev/null
+++ b/task-sdk-tests/tests/task_sdk_tests/test_xcom_operations.py
@@ -0,0 +1,160 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Integration tests for XCom operations for task execution time.
+
+These tests validate the Execution API endpoints for XCom operations:
+- get(): Get XCom value
+- set(): Set XCom value
+- delete(): Delete XCom value
+"""
+
+from __future__ import annotations
+
+from airflow.sdk.api.datamodels._generated import XComResponse
+from airflow.sdk.execution_time.comms import OKResponse
+from task_sdk_tests import console
+
+
+def test_get_xcom(sdk_client, dag_info):
+    """
+    Test getting existing XCom value from `return_tuple_task`.
+
+    Note: XCom APIs return data in serialized format and that is what we are 
testing.
+    """
+    console.print("[yellow]Getting existing XCom from return_tuple_task...")
+
+    response = sdk_client.xcoms.get(
+        dag_id=dag_info["dag_id"],
+        run_id=dag_info["dag_run_id"],
+        task_id="return_tuple_task",
+        key="return_value",
+    )
+
+    console.print(" XCom Get Response ".center(72, "="))
+    console.print(f"[bright_blue]Response Type:[/] {type(response).__name__}")
+    console.print(f"[bright_blue]Key:[/] {response.key}")
+    console.print(f"[bright_blue]Value Type:[/] 
{type(response.value).__name__}")
+    console.print(f"[bright_blue]Value:[/] {response.value}")
+    console.print("=" * 72)
+
+    assert isinstance(response, XComResponse)
+    assert response.key == "return_value"
+    assert response.value == {
+        "__classname__": "builtins.tuple",
+        "__version__": 1,
+        "__data__": [1, "test_value"],
+    }
+    console.print("[green]✅ XCom get test passed!")
+
+
+def test_set_xcom(sdk_client, dag_info):
+    """
+    Test setting XCom value and then getting it to ensure set worked.
+    """
+    console.print("[yellow]Setting XCom value...")
+
+    test_key = "test_xcom_key"
+    test_value = {"test": "data", "number": 42}
+
+    set_response = sdk_client.xcoms.set(
+        dag_id=dag_info["dag_id"],
+        run_id=dag_info["dag_run_id"],
+        task_id="long_running_task",
+        key=test_key,
+        value=test_value,
+    )
+
+    console.print(" XCom Set Response ".center(72, "="))
+    console.print(f"[bright_blue]Response Type:[/] 
{type(set_response).__name__}")
+    console.print(f"[bright_blue]Status:[/] {set_response.ok}")
+    console.print("=" * 72)
+
+    assert isinstance(set_response, OKResponse)
+    assert set_response.ok is True
+
+    console.print("[yellow]Getting XCom value...")
+    get_response = sdk_client.xcoms.get(
+        dag_id=dag_info["dag_id"],
+        run_id=dag_info["dag_run_id"],
+        task_id="long_running_task",
+        key=test_key,
+    )
+
+    console.print(" XCom Get Response ".center(72, "="))
+    console.print(f"[bright_blue]Response Type:[/] 
{type(get_response).__name__}")
+    console.print(f"[bright_blue]Key:[/] {get_response.key}")
+    console.print(f"[bright_blue]Value:[/] {get_response.value}")
+    console.print("=" * 72)
+
+    assert isinstance(get_response, XComResponse)
+    assert get_response.key == test_key
+    assert get_response.value == test_value
+    console.print("[green]✅ XCom set and get test passed!")
+
+
+def test_xcom_delete(sdk_client, dag_info):
+    """
+    Test deleting XCom value.
+    """
+    console.print("[yellow]Deleting XCom value...")
+
+    test_key = "test_xcom_key_delete"
+
+    # Set XCom first
+    sdk_client.xcoms.set(
+        dag_id=dag_info["dag_id"],
+        run_id=dag_info["dag_run_id"],
+        task_id="long_running_task",
+        key=test_key,
+        value="to_be_deleted",
+    )
+
+    # Delete XCom
+    delete_response = sdk_client.xcoms.delete(
+        dag_id=dag_info["dag_id"],
+        run_id=dag_info["dag_run_id"],
+        task_id="long_running_task",
+        key=test_key,
+    )
+
+    console.print(" XCom Delete Response ".center(72, "="))
+    console.print(f"[bright_blue]Response Type:[/] 
{type(delete_response).__name__}")
+    console.print(f"[bright_blue]Status:[/] {delete_response.ok}")
+    console.print("=" * 72)
+
+    assert isinstance(delete_response, OKResponse)
+    assert delete_response.ok is True
+
+    console.print("[yellow]Verifying XCom was deleted...")
+    get_response = sdk_client.xcoms.get(
+        dag_id=dag_info["dag_id"],
+        run_id=dag_info["dag_run_id"],
+        task_id="long_running_task",
+        key=test_key,
+    )
+
+    console.print(" XCom Get After Delete ".center(72, "="))
+    console.print(f"[bright_blue]Response Type:[/] 
{type(get_response).__name__}")
+    console.print(f"[bright_blue]Key:[/] {get_response.key}")
+    console.print(f"[bright_blue]Value:[/] {get_response.value}")
+    console.print("=" * 72)
+
+    assert isinstance(get_response, XComResponse)
+    assert get_response.key == test_key
+    assert get_response.value is None
+    console.print("[green]✅ XCom delete test passed!")

Reply via email to