This is an automated email from the ASF dual-hosted git repository.
uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new dedbeab7eca Expanding task sdk integration tests to cover critical
xcom operations (#57797)
dedbeab7eca is described below
commit dedbeab7eca59fc5233671956d8a3d928b6d787b
Author: Amogh Desai <[email protected]>
AuthorDate: Thu Nov 6 09:41:47 2025 +0530
Expanding task sdk integration tests to cover critical xcom operations
(#57797)
---
task-sdk-tests/dags/test_dag.py | 9 +-
.../tests/task_sdk_tests/test_xcom_operations.py | 160 +++++++++++++++++++++
2 files changed, 168 insertions(+), 1 deletion(-)
diff --git a/task-sdk-tests/dags/test_dag.py b/task-sdk-tests/dags/test_dag.py
index c8f19597ba7..0d69d551de6 100644
--- a/task-sdk-tests/dags/test_dag.py
+++ b/task-sdk-tests/dags/test_dag.py
@@ -29,6 +29,12 @@ def get_task_instance_id(ti=None):
return str(ti.id)
+@task(dag=dag)
+def return_tuple_task(ti=None):
+ """Task that returns a tuple for testing XCom
serialization/deserialization"""
+ return 1, "test_value"
+
+
@task(dag=dag)
def long_running_task(ti=None):
"""Long-running task that sleeps for 5 minutes to allow testing"""
@@ -42,6 +48,7 @@ def long_running_task(ti=None):
get_ti_id = get_task_instance_id()
+tuple_task = return_tuple_task()
long_task = long_running_task()
-get_ti_id >> long_task
+get_ti_id >> tuple_task >> long_task
diff --git a/task-sdk-tests/tests/task_sdk_tests/test_xcom_operations.py
b/task-sdk-tests/tests/task_sdk_tests/test_xcom_operations.py
new file mode 100644
index 00000000000..75692835a70
--- /dev/null
+++ b/task-sdk-tests/tests/task_sdk_tests/test_xcom_operations.py
@@ -0,0 +1,160 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Integration tests for XCom operations for task execution time.
+
+These tests validate the Execution API endpoints for XCom operations:
+- get(): Get XCom value
+- set(): Set XCom value
+- delete(): Delete XCom value
+"""
+
+from __future__ import annotations
+
+from airflow.sdk.api.datamodels._generated import XComResponse
+from airflow.sdk.execution_time.comms import OKResponse
+from task_sdk_tests import console
+
+
+def test_get_xcom(sdk_client, dag_info):
+ """
+ Test getting existing XCom value from `return_tuple_task`.
+
+ Note: XCom APIs return data in serialized format and that is what we are
testing.
+ """
+ console.print("[yellow]Getting existing XCom from return_tuple_task...")
+
+ response = sdk_client.xcoms.get(
+ dag_id=dag_info["dag_id"],
+ run_id=dag_info["dag_run_id"],
+ task_id="return_tuple_task",
+ key="return_value",
+ )
+
+ console.print(" XCom Get Response ".center(72, "="))
+ console.print(f"[bright_blue]Response Type:[/] {type(response).__name__}")
+ console.print(f"[bright_blue]Key:[/] {response.key}")
+ console.print(f"[bright_blue]Value Type:[/]
{type(response.value).__name__}")
+ console.print(f"[bright_blue]Value:[/] {response.value}")
+ console.print("=" * 72)
+
+ assert isinstance(response, XComResponse)
+ assert response.key == "return_value"
+ assert response.value == {
+ "__classname__": "builtins.tuple",
+ "__version__": 1,
+ "__data__": [1, "test_value"],
+ }
+ console.print("[green]✅ XCom get test passed!")
+
+
+def test_set_xcom(sdk_client, dag_info):
+ """
+ Test setting XCom value and then getting it to ensure set worked.
+ """
+ console.print("[yellow]Setting XCom value...")
+
+ test_key = "test_xcom_key"
+ test_value = {"test": "data", "number": 42}
+
+ set_response = sdk_client.xcoms.set(
+ dag_id=dag_info["dag_id"],
+ run_id=dag_info["dag_run_id"],
+ task_id="long_running_task",
+ key=test_key,
+ value=test_value,
+ )
+
+ console.print(" XCom Set Response ".center(72, "="))
+ console.print(f"[bright_blue]Response Type:[/]
{type(set_response).__name__}")
+ console.print(f"[bright_blue]Status:[/] {set_response.ok}")
+ console.print("=" * 72)
+
+ assert isinstance(set_response, OKResponse)
+ assert set_response.ok is True
+
+ console.print("[yellow]Getting XCom value...")
+ get_response = sdk_client.xcoms.get(
+ dag_id=dag_info["dag_id"],
+ run_id=dag_info["dag_run_id"],
+ task_id="long_running_task",
+ key=test_key,
+ )
+
+ console.print(" XCom Get Response ".center(72, "="))
+ console.print(f"[bright_blue]Response Type:[/]
{type(get_response).__name__}")
+ console.print(f"[bright_blue]Key:[/] {get_response.key}")
+ console.print(f"[bright_blue]Value:[/] {get_response.value}")
+ console.print("=" * 72)
+
+ assert isinstance(get_response, XComResponse)
+ assert get_response.key == test_key
+ assert get_response.value == test_value
+ console.print("[green]✅ XCom set and get test passed!")
+
+
+def test_xcom_delete(sdk_client, dag_info):
+ """
+ Test deleting XCom value.
+ """
+ console.print("[yellow]Deleting XCom value...")
+
+ test_key = "test_xcom_key_delete"
+
+ # Set XCom first
+ sdk_client.xcoms.set(
+ dag_id=dag_info["dag_id"],
+ run_id=dag_info["dag_run_id"],
+ task_id="long_running_task",
+ key=test_key,
+ value="to_be_deleted",
+ )
+
+ # Delete XCom
+ delete_response = sdk_client.xcoms.delete(
+ dag_id=dag_info["dag_id"],
+ run_id=dag_info["dag_run_id"],
+ task_id="long_running_task",
+ key=test_key,
+ )
+
+ console.print(" XCom Delete Response ".center(72, "="))
+ console.print(f"[bright_blue]Response Type:[/]
{type(delete_response).__name__}")
+ console.print(f"[bright_blue]Status:[/] {delete_response.ok}")
+ console.print("=" * 72)
+
+ assert isinstance(delete_response, OKResponse)
+ assert delete_response.ok is True
+
+ console.print("[yellow]Verifying XCom was deleted...")
+ get_response = sdk_client.xcoms.get(
+ dag_id=dag_info["dag_id"],
+ run_id=dag_info["dag_run_id"],
+ task_id="long_running_task",
+ key=test_key,
+ )
+
+ console.print(" XCom Get After Delete ".center(72, "="))
+ console.print(f"[bright_blue]Response Type:[/]
{type(get_response).__name__}")
+ console.print(f"[bright_blue]Key:[/] {get_response.key}")
+ console.print(f"[bright_blue]Value:[/] {get_response.value}")
+ console.print("=" * 72)
+
+ assert isinstance(get_response, XComResponse)
+ assert get_response.key == test_key
+ assert get_response.value is None
+ console.print("[green]✅ XCom delete test passed!")