amoghrajesh commented on code in PR #58638:
URL: https://github.com/apache/airflow/pull/58638#discussion_r2573655447
##########
task-sdk-integration-tests/tests/task_sdk_tests/test_xcom_operations.py:
##########
@@ -174,61 +192,226 @@ def test_xcom_delete(sdk_client, dag_info):
console.print("[green]✅ XCom delete test passed!")
[email protected](reason="TODO: Implement XCom head test")
-def test_xcom_head(sdk_client, dag_info):
+def test_xcom_head_unmapped(sdk_client, dag_info):
+ """
+ Test getting count of mapped XCom values.
Review Comment:
```suggestion
Test getting count of unmapped XCom values.
```
##########
task-sdk-integration-tests/tests/task_sdk_tests/test_xcom_operations.py:
##########
@@ -174,61 +192,226 @@ def test_xcom_delete(sdk_client, dag_info):
console.print("[green]✅ XCom delete test passed!")
[email protected](reason="TODO: Implement XCom head test")
-def test_xcom_head(sdk_client, dag_info):
+def test_xcom_head_unmapped(sdk_client, dag_info):
+ """
+ Test getting count of mapped XCom values.
+
+ Expected: XComCountResponse with len field in it (should be ideally equal
to number of unmapped tasks, since we have None it might throw RuntimeError)
+ Endpoint: HEAD /execution/xcoms/{dag_id}/{run_id}/{task_id}/{key}
+ """
+ console.print("[yellow]Testing XCom head for non-mapped task...")
+
+ response_single = sdk_client.xcoms.head(
+ dag_id=dag_info["dag_id"],
+ run_id=dag_info["dag_run_id"],
+ task_id="return_tuple_task",
+ key="return_value",
+ )
+
+ console.print(" XCom Head Response (Non-Mapped) ".center(72, "="))
+ console.print(f"[bright_blue]Response Type:[/]
{type(response_single).__name__}")
+ console.print(f"[bright_blue]Count:[/] {response_single.len}")
+ console.print("=" * 72)
+
+ assert isinstance(response_single, XComCountResponse)
+ assert response_single.len == 1
+
+ console.print("[green]✅ XCom head non-mapped test passed!")
+
+
+def test_xcom_head_mapped(sdk_client, dag_info):
"""
Test getting count of mapped XCom values.
Expected: XComCountResponse with len field in it (should be ideally equal
to number of mapped tasks, since we have None it might throw RuntimeError)
Endpoint: HEAD /execution/xcoms/{dag_id}/{run_id}/{task_id}/{key}
"""
- console.print("[yellow]TODO: Implement test_xcom_head")
- raise NotImplementedError("test_xcom_head not implemented")
+ console.print("[yellow]Testing XCom head for mapped task...")
+
+ response_mapped = sdk_client.xcoms.head(
+ dag_id=dag_info["dag_id"],
+ run_id=dag_info["dag_run_id"],
+ task_id="mapped_task",
+ key="return_value",
+ )
+
+ console.print(" XCom Head Response (Mapped) ".center(72, "="))
+ console.print(f"[bright_blue]Response Type:[/]
{type(response_mapped).__name__}")
+ console.print(f"[bright_blue]Count:[/] {response_mapped.len}")
+ console.print("=" * 72)
+
+ assert isinstance(response_mapped, XComCountResponse)
+ assert response_mapped.len == 4
+ console.print("[green]✅ XCom head mapped test passed!")
[email protected](reason="TODO: Implement XCom get_sequence_item test")
Review Comment:
We could just use pytest.parameterize:
https://docs.pytest.org/en/stable/example/parametrize.html?
##########
task-sdk-integration-tests/tests/task_sdk_tests/test_xcom_operations.py:
##########
@@ -174,61 +192,226 @@ def test_xcom_delete(sdk_client, dag_info):
console.print("[green]✅ XCom delete test passed!")
[email protected](reason="TODO: Implement XCom head test")
-def test_xcom_head(sdk_client, dag_info):
+def test_xcom_head_unmapped(sdk_client, dag_info):
+ """
+ Test getting count of mapped XCom values.
+
+ Expected: XComCountResponse with len field in it (should be ideally equal
to number of unmapped tasks, since we have None it might throw RuntimeError)
+ Endpoint: HEAD /execution/xcoms/{dag_id}/{run_id}/{task_id}/{key}
+ """
+ console.print("[yellow]Testing XCom head for non-mapped task...")
+
+ response_single = sdk_client.xcoms.head(
+ dag_id=dag_info["dag_id"],
+ run_id=dag_info["dag_run_id"],
+ task_id="return_tuple_task",
+ key="return_value",
+ )
+
+ console.print(" XCom Head Response (Non-Mapped) ".center(72, "="))
+ console.print(f"[bright_blue]Response Type:[/]
{type(response_single).__name__}")
+ console.print(f"[bright_blue]Count:[/] {response_single.len}")
+ console.print("=" * 72)
+
+ assert isinstance(response_single, XComCountResponse)
+ assert response_single.len == 1
+
+ console.print("[green]✅ XCom head non-mapped test passed!")
+
+
+def test_xcom_head_mapped(sdk_client, dag_info):
"""
Test getting count of mapped XCom values.
Expected: XComCountResponse with len field in it (should be ideally equal
to number of mapped tasks, since we have None it might throw RuntimeError)
Endpoint: HEAD /execution/xcoms/{dag_id}/{run_id}/{task_id}/{key}
"""
- console.print("[yellow]TODO: Implement test_xcom_head")
- raise NotImplementedError("test_xcom_head not implemented")
+ console.print("[yellow]Testing XCom head for mapped task...")
+
+ response_mapped = sdk_client.xcoms.head(
+ dag_id=dag_info["dag_id"],
+ run_id=dag_info["dag_run_id"],
+ task_id="mapped_task",
+ key="return_value",
+ )
+
+ console.print(" XCom Head Response (Mapped) ".center(72, "="))
+ console.print(f"[bright_blue]Response Type:[/]
{type(response_mapped).__name__}")
+ console.print(f"[bright_blue]Count:[/] {response_mapped.len}")
+ console.print("=" * 72)
+
+ assert isinstance(response_mapped, XComCountResponse)
+ assert response_mapped.len == 4
+ console.print("[green]✅ XCom head mapped test passed!")
[email protected](reason="TODO: Implement XCom get_sequence_item test")
def test_xcom_get_sequence_item(sdk_client, dag_info):
"""
Test getting XCom sequence item by offset.
Expected: XComSequenceIndexResponse with value
Endpoint: GET
/execution/xcoms/{dag_id}/{run_id}/{task_id}/{key}/item/{offset}
"""
- console.print("[yellow]TODO: Implement test_xcom_get_sequence_item")
- raise NotImplementedError("test_xcom_get_sequence_item not implemented")
+ console.print("[yellow]Testing XCom sequence item access...")
+
+ cases = [
+ {"label": "offset=0", "offset": 0, "expected": "processed_alpha"},
+ {"label": "offset=-1", "offset": -1, "expected": "processed_delta"},
+ {"label": "offset=2", "offset": 2, "expected": "processed_gamma"},
+ ]
+
+ for case in cases:
+ response = sdk_client.xcoms.get_sequence_item(
+ dag_id=dag_info["dag_id"],
+ run_id=dag_info["dag_run_id"],
+ task_id="mapped_task",
+ key="return_value",
+ offset=case["offset"],
+ )
+
+ console.print(f" XCom Sequence Item [{case['label']}] ".center(72,
"="))
+ console.print(f"[bright_blue]Response Type:[/]
{type(response).__name__}")
+ console.print(f"[bright_blue]Value:[/] {response.root}")
+ console.print("=" * 72)
+
+ assert isinstance(response, XComSequenceIndexResponse)
+ assert response.root == case["expected"]
+ console.print("[green]✅ XCom get_sequence_item test passed!")
+
+
+def test_xcom_get_sequence_item_not_found_offset(sdk_client, dag_info):
+ """
+ Test getting non-existent XCom sequence item due to out-of-range offset.
+
+ Expected: ErrorResponse with XCOM_NOT_FOUND error
+ Endpoint: GET
/execution/xcoms/{dag_id}/{run_id}/{task_id}/{key}/item/{offset}
+ """
+ console.print("[yellow]Testing XCom sequence item not found (offset)...")
+
+ response = sdk_client.xcoms.get_sequence_item(
+ dag_id=dag_info["dag_id"],
+ run_id=dag_info["dag_run_id"],
+ task_id="mapped_task",
+ key="return_value",
+ offset=10,
+ )
+
+ console.print(" XCom Sequence Item Not Found (offset) ".center(72, "="))
+ console.print(f"[bright_blue]Response Type:[/] {type(response).__name__}")
+ console.print(f"[bright_blue]Error:[/] {response.error}")
+ console.print(f"[bright_blue]Detail:[/] {response.detail}")
+ console.print("=" * 72)
+
+ assert isinstance(response, ErrorResponse)
+ assert response.error == ErrorType.XCOM_NOT_FOUND
+ assert response.detail["key"] == "return_value"
+ assert response.detail["offset"] == 10
+ console.print("[green]✅ XCom get_sequence_item_not_found (offset) test
passed!")
[email protected](reason="TODO: Implement XCom get_sequence_item (not found)
test")
-def test_xcom_get_sequence_item_not_found(sdk_client, dag_info):
+
+def test_xcom_get_sequence_item_not_found_wrong_key(sdk_client, dag_info):
"""
- Test getting non-existent XCom sequence item.
+ Test getting non-existent XCom sequence item due to wrong key.
Expected: ErrorResponse with XCOM_NOT_FOUND error
Endpoint: GET
/execution/xcoms/{dag_id}/{run_id}/{task_id}/{key}/item/{offset}
"""
- console.print("[yellow]TODO: Implement
test_xcom_get_sequence_item_not_found")
- raise NotImplementedError("test_xcom_get_sequence_item_not_found not
implemented")
+ console.print("[yellow]Testing XCom sequence item not found (wrong
key)...")
+
+ response_bad_key = sdk_client.xcoms.get_sequence_item(
+ dag_id=dag_info["dag_id"],
+ run_id=dag_info["dag_run_id"],
+ task_id="mapped_task",
+ key="non_existent_key",
+ offset=0,
+ )
+
+ console.print(" XCom Sequence Item Not Found (wrong key) ".center(72, "="))
+ console.print(f"[bright_blue]Response Type:[/]
{type(response_bad_key).__name__}")
+ console.print(f"[bright_blue]Error:[/] {response_bad_key.error}")
+ console.print("=" * 72)
+
+ assert isinstance(response_bad_key, ErrorResponse)
+ assert response_bad_key.error == ErrorType.XCOM_NOT_FOUND
+
+ console.print("[green]✅ XCom get_sequence_item_not_found (wrong key) test
passed!")
[email protected](reason="TODO: Implement XCom get_sequence_slice test")
Review Comment:
Same comment as above regarding pytest.parameterize
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]